in java/clickhouse-connector/src/integration-test/java/com/epam/deltix/timebase/connector/clickhouse/BaseStreamReplicatorTests.java [441:489]
protected void startStreamReplication(final TickStream stream, final TableIdentity tableIdentity, final int expectedMessageCount,
final int flushMessageCount, final long flushTimeoutMs, StreamRequest streamRequest) {
if (streamRequest == null) {
streamRequest = new StreamRequest();
streamRequest.setKey(stream.getKey());
streamRequest.setStream(stream.getKey());
streamRequest.setColumnNamingScheme(ColumnNamingScheme.TYPE_AND_NAME);
streamRequest.setIncludePartitionColumn(true);
}
StreamReplicator replicator = new StreamReplicator(streamRequest, tickDB, clickhouseClient, clickhouseProperties, flushMessageCount, flushTimeoutMs, streamReplicator -> {});
Thread replicatorThread = new Thread(replicator, String.format("Stream replicator '%s'", stream.getKey()));
replicatorThread.start();
// poll from CH until not replicated
boolean isTableExists = clickhouseClient.existsTable(tableIdentity);
int messageCount = 0;
do {
if (Thread.currentThread().isInterrupted())
break;
if (replicatorThread.getState() == Thread.State.TERMINATED){
throw new RuntimeException("Replication failed");
}
if (isTableExists)
messageCount = getCount(tableIdentity);
else
isTableExists = clickhouseClient.existsTable(tableIdentity);
} while (expectedMessageCount != messageCount);
replicator.stop();
try {
LOG.info()
.append("Joining thread '")
.append(replicatorThread.getName())
.append("'")
.commit();
replicatorThread.join();
} catch (InterruptedException e) {
// log and continue
LOG.info()
.append("Thread '")
.append(replicatorThread.getName())
.append("' interrupted.")
.commit();
}
}