in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timebase/TimebaseStreamReplicationService.java [130:150]
private void transferData(DXTickStream stream, long lastTimestamp, String insertQuery, TimescaleSchema schema) {
DataFeeder<RawMessage> dataFeeder = new RawMessageDataFeeder(stream, lastTimestamp, migrationService);
try {
while (true) {
List<RawMessage> messages = dataFeeder.fetchData(500);
if (!messages.isEmpty()) {
LOG.info().append("Try to save: ").append(messages.size()).append(" raws from stream - ").append(stream.getName()).commit();
timescaleDataService.insertBatch(insertQuery, messages, schema);
} else {
Thread.currentThread().sleep(500l);
}
}
} catch (Exception ex) {
dataFeeder.close();
throw new RuntimeException(ex);
}
}