in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timebase/TimebaseStreamReplicationService.java [57:82]
public void replicate(String streamName) {
try {
DXTickStream stream = connectionService.getStream(streamName);
TimescaleSchema schema = timebaseSchemaDefinition.getTimebaseSchemaDefinition(stream);
generateHyperTable(schema);
if (!migrationMetadataService.getByStreamName(streamName).isPresent()) {
generateMigrationMetadata(streamName, stream.getDataVersion());
} else {
// TODO handle version mismatch
}
long lastTimestamp = getLastReplicatedTimestamp(schema);
String insertQuery = generateInsertStatement(schema);
transferData(stream, lastTimestamp, insertQuery, schema);
} catch (Exception ex) {
LOG.error().append("Failed to replicate stream: ").append(streamName).append(ex).commit();
StreamMetaData metaData = streamMetaDataCacheService.get(streamName);
metaData.updateStatus(StreamMetaData.Status.FAILED);
eventPublisher.publishEvent(new FailedReplicationEvent(streamName));
}
}