private RawMessage nextMessage()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [190:217]


    private RawMessage nextMessage(TickCursor cursor) {
        RawMessage message = null;
        if (!cursor.isClosed() && cursor.next()) {
            message = (RawMessage) cursor.getMessage();
            if (message.getTimeStampMs() != lastMessageTimestamp) {
                // we need to wait if cursor is too close to the live edge
                while (message == null || message.getTimeStampMs() > System.currentTimeMillis() - OFFSET_INTERVAL) {
                    if (! live)
                        return null; // stop here to avoid reading messages out of order

                    LOG.info("Waiting for more messages " + ACCUMULATION_INTERVAL + " ms");
                    sleep(ACCUMULATION_INTERVAL);

                    if (cursor.isClosed())
                        return null;

                    cursor.reset(lastMessageTimestamp + 1);
                    message = cursor.next() ? (RawMessage) cursor.getMessage() : null;
                }

                lastMessageTimestamp = message.getTimeStampMs();
                lastMessageCounter = 1;
            } else {
                lastMessageCounter++;
            }
        }
        return message;
    }