in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [49:80]
public RawMessageSerializer(TBConnectorConfig config, RecordClassDescriptor descriptor, Schema schema) {
this.flattenFields = config.getFlattenFields();
this.tbOffsetField = config.getTBMessageIDField();
this.tbKeyField = config.getTBMessageKeyField();
this.tbTombstoneField = config.getTBMessageTombstoneField();
this.instrumentField = config.getInstrumentField();
this.symbolField = config.getSymbolField();
this.timeField = config.getTimeField();
this.fieldSelection = config.getFieldSelection();
this.timestampFields = config.getTimestampFields();
this.fieldMap = config.getNameAliases();
if (this.tbTombstoneField != null && this.tbKeyField == null) {
throw new IllegalArgumentException("TimeBase key field is required when storing tombstone records");
}
this.descriptor = descriptor;
if (this.descriptor != null) {
if (schema != null)
validateSchema(schema);
if (tbOffsetField != null && !descriptor.hasField(tbOffsetField))
throw new IllegalArgumentException("TimeBase stream descriptor is missing configured offset field: " + tbOffsetField);
if (tbKeyField != null && !descriptor.hasField(tbKeyField))
throw new IllegalArgumentException("TimeBase stream descriptor is missing configured key field: " + tbKeyField);
if (tbTombstoneField != null && !descriptor.hasField(tbTombstoneField))
throw new IllegalArgumentException("TimeBase stream descriptor is missing configured tombstone field: " + tbTombstoneField);
LOG.info("Using existing TimeBase stream descriptor: " + descriptor);
} else {
this.descriptor = buildDescriptor(schema);
LOG.info("Built TimeBase stream descriptor for " + schema.name() + ": " + this.descriptor);
}
}