public void replicate()

in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timebase/TimebaseStreamReplicationService.java [57:82]


    public void replicate(String streamName) {
        try {
            DXTickStream stream = connectionService.getStream(streamName);

            TimescaleSchema schema = timebaseSchemaDefinition.getTimebaseSchemaDefinition(stream);

            generateHyperTable(schema);

            if (!migrationMetadataService.getByStreamName(streamName).isPresent()) {
                generateMigrationMetadata(streamName, stream.getDataVersion());
            } else {
                // TODO handle version mismatch
            }

            long lastTimestamp = getLastReplicatedTimestamp(schema);

            String insertQuery = generateInsertStatement(schema);

            transferData(stream, lastTimestamp, insertQuery, schema);
        } catch (Exception ex) {
            LOG.error().append("Failed to replicate stream: ").append(streamName).append(ex).commit();
            StreamMetaData metaData = streamMetaDataCacheService.get(streamName);
            metaData.updateStatus(StreamMetaData.Status.FAILED);
            eventPublisher.publishEvent(new FailedReplicationEvent(streamName));
        }
    }