public RawMessage serialize()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [82:109]


    public RawMessage serialize(SinkRecord record, long lastMessageTimeStamp) {
        Struct value = (Struct) record.value();
        if (value == null && (tbTombstoneField == null || descriptor == null)) {
            LOG.warn("Ignoring tombstone record with the key " + record.key());
            return null;
        }
        else if (descriptor == null) {
            descriptor = buildDescriptor(record.valueSchema());
            LOG.info("Built Descriptor for " + record.valueSchema().name() + ": " + descriptor);
        }

        FixedUnboundEncoder encoder = getEncoder(descriptor);

        output.reset();
        encoder.beginWrite(output);
        writeStruct(value, record, descriptor, encoder);
        encoder.endWrite();

        RawMessage message = new RawMessage(descriptor);
        message.setInstrumentType(instrumentField == null || value == null ? InstrumentType.CUSTOM : InstrumentType.valueOf(value.getString(instrumentField)));
        message.setSymbol(symbolField == null || value == null ? NO_SYMBOL : value.getString(symbolField));
        Long timestamp = timeField == null  ? record.timestamp() : value == null ? lastMessageTimeStamp : getTimestamp(value, timeField);
        if (timestamp != null)
            message.setTimeStampMs(timestamp);

        message.copyBytes(output, 0);
        return message;
    }