public void put()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSinkTask.java [140:162]


    public void put(Collection<SinkRecord> records) {
        boolean debug = LOG.isDebugEnabled();

        for (SinkRecord record : records) {
            if (serializer == null) {
                // ignore tombstone messages if the stream was not created yet
                if (msgStream == null && record.valueSchema() == null) {
                    LOG.warn("Ignoring tombstone record with the key " + record.key());
                    continue;
                }
                initSerializer(record);
            }

            if (debug) LOG.debug("Record: " + record);
            RawMessage message = serializer.serialize(record, lastMessageTimestamp);
            if (debug) LOG.debug("RawMessage: " + message);

            if (message != null) {
                lastMessageTimestamp = message.getTimeStampMs();
                send(message);
            }
        }
    }