in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [82:109]
public RawMessage serialize(SinkRecord record, long lastMessageTimeStamp) {
Struct value = (Struct) record.value();
if (value == null && (tbTombstoneField == null || descriptor == null)) {
LOG.warn("Ignoring tombstone record with the key " + record.key());
return null;
}
else if (descriptor == null) {
descriptor = buildDescriptor(record.valueSchema());
LOG.info("Built Descriptor for " + record.valueSchema().name() + ": " + descriptor);
}
FixedUnboundEncoder encoder = getEncoder(descriptor);
output.reset();
encoder.beginWrite(output);
writeStruct(value, record, descriptor, encoder);
encoder.endWrite();
RawMessage message = new RawMessage(descriptor);
message.setInstrumentType(instrumentField == null || value == null ? InstrumentType.CUSTOM : InstrumentType.valueOf(value.getString(instrumentField)));
message.setSymbol(symbolField == null || value == null ? NO_SYMBOL : value.getString(symbolField));
Long timestamp = timeField == null ? record.timestamp() : value == null ? lastMessageTimeStamp : getTimestamp(value, timeField);
if (timestamp != null)
message.setTimeStampMs(timestamp);
message.copyBytes(output, 0);
return message;
}