protected void startStreamReplication()

in java/clickhouse-connector/src/integration-test/java/com/epam/deltix/timebase/connector/clickhouse/BaseStreamReplicatorTests.java [441:489]


    protected void startStreamReplication(final TickStream stream, final TableIdentity tableIdentity, final int expectedMessageCount,
                                          final int flushMessageCount, final long flushTimeoutMs, StreamRequest streamRequest) {
        if (streamRequest == null) {
            streamRequest = new StreamRequest();
            streamRequest.setKey(stream.getKey());
            streamRequest.setStream(stream.getKey());
            streamRequest.setColumnNamingScheme(ColumnNamingScheme.TYPE_AND_NAME);
            streamRequest.setIncludePartitionColumn(true);
        }
        StreamReplicator replicator = new StreamReplicator(streamRequest, tickDB, clickhouseClient, clickhouseProperties, flushMessageCount, flushTimeoutMs, streamReplicator -> {});
        Thread replicatorThread = new Thread(replicator, String.format("Stream replicator '%s'", stream.getKey()));

        replicatorThread.start();

        // poll from CH until not replicated
        boolean isTableExists = clickhouseClient.existsTable(tableIdentity);
        int messageCount = 0;
        do {
            if (Thread.currentThread().isInterrupted())
                break;

            if (replicatorThread.getState() == Thread.State.TERMINATED){
                throw new RuntimeException("Replication failed");
            }

            if (isTableExists)
                messageCount = getCount(tableIdentity);
            else
                isTableExists = clickhouseClient.existsTable(tableIdentity);
        } while (expectedMessageCount != messageCount);

        replicator.stop();
        try {
            LOG.info()
                    .append("Joining thread '")
                    .append(replicatorThread.getName())
                    .append("'")
                    .commit();

            replicatorThread.join();
        } catch (InterruptedException e) {
            // log and continue
            LOG.info()
                    .append("Thread '")
                    .append(replicatorThread.getName())
                    .append("' interrupted.")
                    .commit();
        }
    }