public void testTimebaseStreamReplication()

in timescaledb-connector/src/integration-test/java/com/epam/deltix/timebase/connector/service/timebase/TimebaseStreamReplicationServiceTest.java [102:137]


    public void testTimebaseStreamReplication() {
        // open stream in readOnly mode
        DXTickStream stream = connectionService.getStream("someStreamName");
        // generate schema
        TimescaleSchema schema = timescaleSchemaDefinition.getTimebaseSchemaDefinition(stream);

        // apply generated schema to TimescaleDB
        timebaseStreamReplicationService.generateHyperTable(schema);
        // get last message timestamp
        long lastReplicatedTimestamp = timebaseStreamReplicationService.getLastReplicatedTimestamp(schema);
        // generate insert query
        String insertQuery = timebaseStreamReplicationService.generateInsertStatement(schema);

        // create TimeBase data feeder
        DataFeeder<RawMessage> dataFeeder = new RawMessageDataFeeder(stream, lastReplicatedTimestamp, migrationService);

        // start replication
        int replicatedMessageCount = 0;
        while (replicatedMessageCount < 2) {
            List<RawMessage> messages = dataFeeder.fetchData(500);

            if (!messages.isEmpty()) {
                dataService.insertBatch(insertQuery, messages, schema);
                replicatedMessageCount += messages.size();
            }
        }

        // get initial TimeBase entities
        List<BaseTestClass> timeBaseEntities = getTimeBaseEntities();
        // get replicated entities from TimescaleDB
        List<BaseTestClass> timescaleEntities = getTimescaleEntities();

        // assert entities
        assertFirstTestClass(timeBaseEntities, timescaleEntities);
        assertSecondTestClass(timeBaseEntities, timescaleEntities);
    }