public void startReplicators()

in java/clickhouse-connector/src/main/java/com/epam/deltix/timebase/connector/clickhouse/services/ReplicatorService.java [166:206]


    public void startReplicators() {
        ArrayList<ReplicationRequest> requests = new ArrayList<>(replicationProperties.getStreams());
        requests.addAll(replicationProperties.getQueries());
        for (ReplicationRequest request : requests) {
            if (request.getIncludePartitionColumn() == null) {
                request.setIncludePartitionColumn(replicationProperties.isIncludePartitionColumn());
            }
            if (request.getColumnNamingScheme() == null) {
                request.setColumnNamingScheme(replicationProperties.getColumnNamingScheme());
            }
        }

        List<QueryRequest> queryRequests = replicationProperties.getQueries();
        List<StreamRequest> streamRequests = replicationProperties.getStreams();
        validateRequests(queryRequests, streamRequests);

        Set<String> runningReplicationKeys = replicators.keySet().stream().map(Replicator::getKey).collect(Collectors.toSet());
        streamRequests.stream()
                .filter(sr -> !runningReplicationKeys.contains(sr.getKey()))
                .forEach(streamRequest -> {
                    Replicator replicator = new StreamReplicator(streamRequest, tickDb, clickhouseClient, clickhouseProperties,
                    replicationProperties.getFlushMessageCount(),
                    replicationProperties.getFlushTimeoutMs(), this::onReplicatorStopped);
                    Thread replicatorThread = new Thread(replicator, String.format("Stream replicator '%s'", replicator.getKey()));

                    replicators.put(replicator, replicatorThread);
                    replicatorThread.start();
                });

        queryRequests.stream()
                .filter(qr -> !runningReplicationKeys.contains(qr.getKey()))
                .forEach(queryRequest -> {
            Replicator replicator = new QueryReplicator(queryRequest, tickDb, clickhouseClient, clickhouseProperties,
                    replicationProperties.getFlushMessageCount(),
                    replicationProperties.getFlushTimeoutMs(), this::onReplicatorStopped);
            Thread replicatorThread = new Thread(replicator, String.format("Query replicator '%s'", replicator.getKey()));

            replicators.put(replicator, replicatorThread);
            replicatorThread.start();
        });
    }