protected void startQueryReplication()

in java/clickhouse-connector/src/integration-test/java/com/epam/deltix/timebase/connector/clickhouse/BaseStreamReplicatorTests.java [491:532]


    protected void startQueryReplication(final TableIdentity tableIdentity, final int expectedMessageCount,
                                          final int flushMessageCount, final long flushTimeoutMs, QueryRequest queryRequest) {
        QueryReplicator replicator = new QueryReplicator(queryRequest, tickDB, clickhouseClient, clickhouseProperties, flushMessageCount, flushTimeoutMs, streamReplicator -> {});
        Thread replicatorThread = new Thread(replicator, String.format("Stream replicator '%s'", queryRequest.getKey()));

        replicatorThread.start();

        // poll from CH until not replicated
        boolean isTableExists = clickhouseClient.existsTable(tableIdentity);
        int messageCount = 0;
        do {
            if (Thread.currentThread().isInterrupted())
                break;

            if (replicatorThread.getState() == Thread.State.TERMINATED){
                throw new RuntimeException("Replication failed");
            }

            if (isTableExists)
                messageCount = getCount(tableIdentity);
            else
                isTableExists = clickhouseClient.existsTable(tableIdentity);
        } while (expectedMessageCount != messageCount);

        replicator.stop();
        try {
            LOG.info()
                    .append("Joining thread '")
                    .append(replicatorThread.getName())
                    .append("'")
                    .commit();

            replicatorThread.join();
        } catch (InterruptedException e) {
            // log and continue
            LOG.info()
                    .append("Thread '")
                    .append(replicatorThread.getName())
                    .append("' interrupted.")
                    .commit();
        }
    }