in java/clickhouse-connector/src/main/java/com/epam/deltix/timebase/connector/clickhouse/services/ReplicatorService.java [166:206]
public void startReplicators() {
ArrayList<ReplicationRequest> requests = new ArrayList<>(replicationProperties.getStreams());
requests.addAll(replicationProperties.getQueries());
for (ReplicationRequest request : requests) {
if (request.getIncludePartitionColumn() == null) {
request.setIncludePartitionColumn(replicationProperties.isIncludePartitionColumn());
}
if (request.getColumnNamingScheme() == null) {
request.setColumnNamingScheme(replicationProperties.getColumnNamingScheme());
}
}
List<QueryRequest> queryRequests = replicationProperties.getQueries();
List<StreamRequest> streamRequests = replicationProperties.getStreams();
validateRequests(queryRequests, streamRequests);
Set<String> runningReplicationKeys = replicators.keySet().stream().map(Replicator::getKey).collect(Collectors.toSet());
streamRequests.stream()
.filter(sr -> !runningReplicationKeys.contains(sr.getKey()))
.forEach(streamRequest -> {
Replicator replicator = new StreamReplicator(streamRequest, tickDb, clickhouseClient, clickhouseProperties,
replicationProperties.getFlushMessageCount(),
replicationProperties.getFlushTimeoutMs(), this::onReplicatorStopped);
Thread replicatorThread = new Thread(replicator, String.format("Stream replicator '%s'", replicator.getKey()));
replicators.put(replicator, replicatorThread);
replicatorThread.start();
});
queryRequests.stream()
.filter(qr -> !runningReplicationKeys.contains(qr.getKey()))
.forEach(queryRequest -> {
Replicator replicator = new QueryReplicator(queryRequest, tickDb, clickhouseClient, clickhouseProperties,
replicationProperties.getFlushMessageCount(),
replicationProperties.getFlushTimeoutMs(), this::onReplicatorStopped);
Thread replicatorThread = new Thread(replicator, String.format("Query replicator '%s'", replicator.getKey()));
replicators.put(replicator, replicatorThread);
replicatorThread.start();
});
}