public void run()

in java/clickhouse-connector/src/main/java/com/epam/deltix/timebase/connector/clickhouse/algos/StreamReplicator.java [64:162]


    public void run() {
        long count = 0;
        int reportThreshold = Math.min(flushMessageCount * 10, 1_000_000);
        try {
            DXTickStream stream = tickDb.getStream(request.getStream());

            SchemaOptions schemaOptions = getSchemaOptions(stream);
            SchemaProcessor schemaProcessor = new SchemaProcessor(schemaOptions, clickhouseClient, clickhouseProperties);

            LOG.info()
                    .append("Replication ")
                    .append(request.getKey())
                    .append(": prepare target table schema.")
                    .commit();
            Map<String, TableDeclaration> clickhouseTables = schemaProcessor.prepareClickhouseTable();

            long from = Long.MIN_VALUE;
            if (WriteMode.APPEND == request.getWriteMode()) {
                from = findLastTimestamp(clickhouseTables.values());
                truncateData(clickhouseTables.values(), from);
            }

            SelectionOptions selectionOptions = new SelectionOptions(true, true);

            MemoryDataInput in = new MemoryDataInput();
            RecordClassDescriptor[] descriptors = schemaOptions.getTbSchema().getContentClasses();
            List<UnboundDecoder> decoders = Arrays.stream(descriptors).map(CodecFactory.COMPILED::createFixedUnboundDecoder).collect(Collectors.toList());

            tableWriter = new UnboundTableWriter(request.getKey(), request.getColumnNamingScheme(), clickhouseClient,
                    clickhouseTables,  schemaProcessor.getColumnDeclarations(), decoders, in/*, 10_000, 5_000*/);
            if (!request.getIncludePartitionColumn()) {
                tableWriter.removeFixedColumn(SchemaProcessor.PARTITION_COLUMN_NAME);
            }
            try (TickCursor cursor = stream.select(from, selectionOptions)) {
                // making live cursor non-blocking
                cursor.setAvailabilityListener(this::notifyDataAvailable);

                do {
                    if (cancel)
                        break;

                    if (tableWriter.getBatchMsgCount() > 0) { // we have messages in queue
                        if (tableWriter.getBatchMsgCount() >= flushMessageCount || // batch size reached
                                TimeKeeper.currentTime >= lastFlushTimestamp + flushTimeoutMs) // flush interval reached
                            flush();
                    }

                    synchronized (unblockingCursorLock) {
                        try {
                            if (cursor.next())
                                tableWriter.send((RawMessage) cursor.getMessage(), cursor);
                            else
                                break;
                            count++;
                        } catch (UnavailableResourceException e) {
                            try {
                                long timeout = flushTimeoutMs - (TimeKeeper.currentTime - lastFlushTimestamp);
                                if (timeout > 0) {
                                    unblockingCursorLock.wait(timeout);
                                } else {
                                    lastFlushTimestamp = TimeKeeper.currentTime;
                                }
                            } catch (InterruptedException ie) {
                                // continue
                            }
                        }

                        if (count % reportThreshold == 0 && count > 0)
                            LOG.info().append("Replication ").append(request.getKey())
                                    .append(": write ").append(count).append(" messages.").commit();
                    }
                } while (true);
            }
            LOG.info()
                    .append("Replication ")
                    .append(request.getKey())
                    .append(": read process finished. Stopping.")
                    .commit();

            //flush(); // slice might be incomplete
        } catch (Throwable e) {
            LOG.error()
                    .append("Replication ")
                    .append(request.getKey())
                    .append(": unhandled exception during replication process. Stopping.")
                    .append(e)
                    .commit();
        } finally {
            if (tableWriter != null)
                tableWriter.close();
            onStopped.accept(this);
            LOG.info()
                    .append("Replication ")
                    .append(request.getKey())
                    .append(": stopped. Replicated ")
                    .append(count).append(" messages.")
                    .commit();
        }
    }