in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSinkTask.java [140:162]
public void put(Collection<SinkRecord> records) {
boolean debug = LOG.isDebugEnabled();
for (SinkRecord record : records) {
if (serializer == null) {
// ignore tombstone messages if the stream was not created yet
if (msgStream == null && record.valueSchema() == null) {
LOG.warn("Ignoring tombstone record with the key " + record.key());
continue;
}
initSerializer(record);
}
if (debug) LOG.debug("Record: " + record);
RawMessage message = serializer.serialize(record, lastMessageTimestamp);
if (debug) LOG.debug("RawMessage: " + message);
if (message != null) {
lastMessageTimestamp = message.getTimeStampMs();
send(message);
}
}
}