in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [117:145]
private void resetToOffset(long offset) {
while (cursor.next()) {
RawMessage message = (RawMessage) cursor.getMessage();
if (message.getTimeStampMs() != lastMessageTimestamp) {
break;
}
lastMessageCounter++;
if (tbMessageId == null) {
if (offset == lastMessageCounter) {
LOG.info("Found last recorded message: " + offset);
return;
}
}
else {
Map<String,Object> values = deserializer.getValues(message);
Long messageId = (Long) values.get(tbMessageId);
if (messageId != null && messageId == offset) {
LOG.info("Found last recorded message with ID: " + offset);
return;
}
}
}
LOG.warn("Last message with offset " + offset + " not found. Starting at timestamp: " + lastMessageTimestamp);
cursor.reset(lastMessageTimestamp);
lastMessageCounter = 0;
}