private void resetToOffset()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [117:145]


    private void resetToOffset(long offset) {
        while (cursor.next()) {
            RawMessage message = (RawMessage) cursor.getMessage();
            if (message.getTimeStampMs() != lastMessageTimestamp) {
                break;
            }

            lastMessageCounter++;

            if (tbMessageId == null) {
                if (offset == lastMessageCounter) {
                    LOG.info("Found last recorded message: " + offset);
                    return;
                }
            }
            else {
                Map<String,Object> values = deserializer.getValues(message);
                Long messageId = (Long) values.get(tbMessageId);
                if (messageId != null && messageId == offset) {
                    LOG.info("Found last recorded message with ID: " + offset);
                    return;
                }
            }
        }

        LOG.warn("Last message with offset " + offset + " not found. Starting at timestamp: " + lastMessageTimestamp);
        cursor.reset(lastMessageTimestamp);
        lastMessageCounter = 0;
    }