in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [190:217]
private RawMessage nextMessage(TickCursor cursor) {
RawMessage message = null;
if (!cursor.isClosed() && cursor.next()) {
message = (RawMessage) cursor.getMessage();
if (message.getTimeStampMs() != lastMessageTimestamp) {
// we need to wait if cursor is too close to the live edge
while (message == null || message.getTimeStampMs() > System.currentTimeMillis() - OFFSET_INTERVAL) {
if (! live)
return null; // stop here to avoid reading messages out of order
LOG.info("Waiting for more messages " + ACCUMULATION_INTERVAL + " ms");
sleep(ACCUMULATION_INTERVAL);
if (cursor.isClosed())
return null;
cursor.reset(lastMessageTimestamp + 1);
message = cursor.next() ? (RawMessage) cursor.getMessage() : null;
}
lastMessageTimestamp = message.getTimeStampMs();
lastMessageCounter = 1;
} else {
lastMessageCounter++;
}
}
return message;
}