in timescaledb-connector/src/integration-test/java/com/epam/deltix/timebase/connector/service/timebase/TimebaseStreamReplicationServiceTest.java [102:137]
public void testTimebaseStreamReplication() {
// open stream in readOnly mode
DXTickStream stream = connectionService.getStream("someStreamName");
// generate schema
TimescaleSchema schema = timescaleSchemaDefinition.getTimebaseSchemaDefinition(stream);
// apply generated schema to TimescaleDB
timebaseStreamReplicationService.generateHyperTable(schema);
// get last message timestamp
long lastReplicatedTimestamp = timebaseStreamReplicationService.getLastReplicatedTimestamp(schema);
// generate insert query
String insertQuery = timebaseStreamReplicationService.generateInsertStatement(schema);
// create TimeBase data feeder
DataFeeder<RawMessage> dataFeeder = new RawMessageDataFeeder(stream, lastReplicatedTimestamp, migrationService);
// start replication
int replicatedMessageCount = 0;
while (replicatedMessageCount < 2) {
List<RawMessage> messages = dataFeeder.fetchData(500);
if (!messages.isEmpty()) {
dataService.insertBatch(insertQuery, messages, schema);
replicatedMessageCount += messages.size();
}
}
// get initial TimeBase entities
List<BaseTestClass> timeBaseEntities = getTimeBaseEntities();
// get replicated entities from TimescaleDB
List<BaseTestClass> timescaleEntities = getTimescaleEntities();
// assert entities
assertFirstTestClass(timeBaseEntities, timescaleEntities);
assertSecondTestClass(timeBaseEntities, timescaleEntities);
}