in java/clickhouse-connector/src/main/java/com/epam/deltix/timebase/connector/clickhouse/algos/QueryReplicator.java [68:181]
public void run() {
long count = 0;
int reportThreshold = Math.min(flushMessageCount * 10, 1_000_000);
try {
SchemaOptions schemaOptions = getSchemaOptions();
SchemaProcessor schemaProcessor = new SchemaProcessor(schemaOptions, clickhouseClient, clickhouseProperties);
LOG.info()
.append("Replication ")
.append(request.getKey())
.append(": prepare target table schema.")
.commit();
Map<String, TableDeclaration> clickhouseTables = schemaProcessor.prepareClickhouseTable();
long from = Long.MIN_VALUE;
if (WriteMode.APPEND == request.getWriteMode()) {
from = findLastTimestamp(clickhouseTables.values());
truncateData(clickhouseTables.values(), from);
}
SelectionOptions selectionOptions = new SelectionOptions(true, true);
MemoryDataInput in = new MemoryDataInput();
RecordClassDescriptor[] descriptors = schemaOptions.getTbSchema().getContentClasses();
List<UnboundDecoder> decoders = Arrays.stream(descriptors).map(CodecFactory.COMPILED::createFixedUnboundDecoder).collect(Collectors.toList());
tableWriter = new UnboundTableWriter(request.getKey(), request.getColumnNamingScheme(), clickhouseClient,
clickhouseTables, schemaProcessor.getColumnDeclarations(), decoders, in/*, 10_000, 5_000*/);
if (!request.getIncludePartitionColumn()) {
tableWriter.removeFixedColumn(SchemaProcessor.PARTITION_COLUMN_NAME);
}
try (InstrumentMessageSource cursor = tickDb.executeQuery(request.getQuery(), selectionOptions, null, null, from )) {
// making live cursor non-blocking
cursor.setAvailabilityListener(this::notifyDataAvailable);
do {
if (cancel)
break;
if (tableWriter.getBatchMsgCount() > 0) { // we have messages in queue
if (tableWriter.getBatchMsgCount() >= flushMessageCount || // batch size reached
TimeKeeper.currentTime >= lastFlushTimestamp + flushTimeoutMs) // flush interval reached
flush();
}
synchronized (unblockingCursorLock) {
try {
if (cursor.next()) {
RawMessage message = (RawMessage) cursor.getMessage();
if (message.type.getName().equals(QUERY_STATUS_MESSAGE_TYPE)) {
LOG.warn().append("Message type is not allowed. Message ignored: {Type: ").append(message.type)
.append(", timestamp: ").append(message.getTimeStampMs()).append(", symbol: ")
.append(message.getSymbol()).append(", ").append(messageHelper.getValues(message))
.append("}").commit();
continue;
}
if (message.getTimeStampMs() == Long.MIN_VALUE) {
LOG.warn().append("Timestamp is not defined. Message ignored: {Type: ").append(message.type).append(", timestamp: ")
.append(message.getTimeStampMs()).append(", symbol: ").append(message.getSymbol())
.append(", ").append(messageHelper.getValues(message)).append("}").commit();
continue;
}
tableWriter.send(message, cursor);
} else
break;
count++;
} catch (UnavailableResourceException e) {
try {
long timeout = flushTimeoutMs - (TimeKeeper.currentTime - lastFlushTimestamp);
if (timeout > 0) {
unblockingCursorLock.wait(timeout);
} else {
lastFlushTimestamp = TimeKeeper.currentTime;
}
} catch (InterruptedException ie) {
// continue
}
}
if (count % reportThreshold == 0 && count > 0)
LOG.info().append("Replication ").append(request.getKey())
.append(": write ").append(count).append(" messages.").commit();
}
} while (true);
}
LOG.info()
.append("Replication ")
.append(request.getKey())
.append(": read process finished. Stopping.")
.commit();
//flush(); // slice might be incomplete
} catch (Throwable e) {
LOG.error()
.append("Replication ")
.append(request.getKey())
.append(": unhandled exception during replication process. Stopping.")
.append(e)
.commit();
} finally {
if (tableWriter != null)
tableWriter.close();
onStopped.accept(this);
LOG.info()
.append("Replication ")
.append(request.getKey())
.append(": stopped. Replicated ")
.append(count).append(" messages.")
.commit();
}
}