in java/clickhouse-connector/src/main/java/com/epam/deltix/timebase/connector/clickhouse/algos/StreamReplicator.java [64:162]
public void run() {
long count = 0;
int reportThreshold = Math.min(flushMessageCount * 10, 1_000_000);
try {
DXTickStream stream = tickDb.getStream(request.getStream());
SchemaOptions schemaOptions = getSchemaOptions(stream);
SchemaProcessor schemaProcessor = new SchemaProcessor(schemaOptions, clickhouseClient, clickhouseProperties);
LOG.info()
.append("Replication ")
.append(request.getKey())
.append(": prepare target table schema.")
.commit();
Map<String, TableDeclaration> clickhouseTables = schemaProcessor.prepareClickhouseTable();
long from = Long.MIN_VALUE;
if (WriteMode.APPEND == request.getWriteMode()) {
from = findLastTimestamp(clickhouseTables.values());
truncateData(clickhouseTables.values(), from);
}
SelectionOptions selectionOptions = new SelectionOptions(true, true);
MemoryDataInput in = new MemoryDataInput();
RecordClassDescriptor[] descriptors = schemaOptions.getTbSchema().getContentClasses();
List<UnboundDecoder> decoders = Arrays.stream(descriptors).map(CodecFactory.COMPILED::createFixedUnboundDecoder).collect(Collectors.toList());
tableWriter = new UnboundTableWriter(request.getKey(), request.getColumnNamingScheme(), clickhouseClient,
clickhouseTables, schemaProcessor.getColumnDeclarations(), decoders, in/*, 10_000, 5_000*/);
if (!request.getIncludePartitionColumn()) {
tableWriter.removeFixedColumn(SchemaProcessor.PARTITION_COLUMN_NAME);
}
try (TickCursor cursor = stream.select(from, selectionOptions)) {
// making live cursor non-blocking
cursor.setAvailabilityListener(this::notifyDataAvailable);
do {
if (cancel)
break;
if (tableWriter.getBatchMsgCount() > 0) { // we have messages in queue
if (tableWriter.getBatchMsgCount() >= flushMessageCount || // batch size reached
TimeKeeper.currentTime >= lastFlushTimestamp + flushTimeoutMs) // flush interval reached
flush();
}
synchronized (unblockingCursorLock) {
try {
if (cursor.next())
tableWriter.send((RawMessage) cursor.getMessage(), cursor);
else
break;
count++;
} catch (UnavailableResourceException e) {
try {
long timeout = flushTimeoutMs - (TimeKeeper.currentTime - lastFlushTimestamp);
if (timeout > 0) {
unblockingCursorLock.wait(timeout);
} else {
lastFlushTimestamp = TimeKeeper.currentTime;
}
} catch (InterruptedException ie) {
// continue
}
}
if (count % reportThreshold == 0 && count > 0)
LOG.info().append("Replication ").append(request.getKey())
.append(": write ").append(count).append(" messages.").commit();
}
} while (true);
}
LOG.info()
.append("Replication ")
.append(request.getKey())
.append(": read process finished. Stopping.")
.commit();
//flush(); // slice might be incomplete
} catch (Throwable e) {
LOG.error()
.append("Replication ")
.append(request.getKey())
.append(": unhandled exception during replication process. Stopping.")
.append(e)
.commit();
} finally {
if (tableWriter != null)
tableWriter.close();
onStopped.accept(this);
LOG.info()
.append("Replication ")
.append(request.getKey())
.append(": stopped. Replicated ")
.append(count).append(" messages.")
.commit();
}
}