in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [148:168]
public List<SourceRecord> poll() throws InterruptedException {
boolean debug = LOG.isDebugEnabled();
try {
RawMessage message;
while ((message = nextMessage(cursor)) != null) {
if (debug) LOG.debug("RawMessage: " + message);
if (tbMessageType == null || tbMessageType.equals(message.type.getName())) {
SourceRecord record = deserializer.deserialize(message, partition, lastMessageTimestamp, lastMessageCounter);
if (debug) LOG.debug("Record: " + record.value() + " (" + record.sourceOffset() + ")");
return Arrays.asList(record);
}
}
}
catch (Exception ex) {
LOG.error("Failed to read next message", ex);
throw new InterruptedException("Exiting");
}
return null;
}