in java/clickhouse-connector/src/integration-test/java/com/epam/deltix/timebase/connector/clickhouse/BaseStreamReplicatorTests.java [491:532]
protected void startQueryReplication(final TableIdentity tableIdentity, final int expectedMessageCount,
final int flushMessageCount, final long flushTimeoutMs, QueryRequest queryRequest) {
QueryReplicator replicator = new QueryReplicator(queryRequest, tickDB, clickhouseClient, clickhouseProperties, flushMessageCount, flushTimeoutMs, streamReplicator -> {});
Thread replicatorThread = new Thread(replicator, String.format("Stream replicator '%s'", queryRequest.getKey()));
replicatorThread.start();
// poll from CH until not replicated
boolean isTableExists = clickhouseClient.existsTable(tableIdentity);
int messageCount = 0;
do {
if (Thread.currentThread().isInterrupted())
break;
if (replicatorThread.getState() == Thread.State.TERMINATED){
throw new RuntimeException("Replication failed");
}
if (isTableExists)
messageCount = getCount(tableIdentity);
else
isTableExists = clickhouseClient.existsTable(tableIdentity);
} while (expectedMessageCount != messageCount);
replicator.stop();
try {
LOG.info()
.append("Joining thread '")
.append(replicatorThread.getName())
.append("'")
.commit();
replicatorThread.join();
} catch (InterruptedException e) {
// log and continue
LOG.info()
.append("Thread '")
.append(replicatorThread.getName())
.append("' interrupted.")
.commit();
}
}