private void fetchDataStream()

in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timebase/RawMessageDataFeeder.java [114:161]


    private void fetchDataStream() throws InterruptedException {
        if (stream == null) {
            LOG.error()
                    .append("Stream could not be NULL.")
                    .commit();
            return;
        }

        try {
            SelectionOptions selectionOptions = new SelectionOptions(true, true, ChannelQualityOfService.MAX_THROUGHPUT);
            selectionOptions.versionTracking = true;
            cursor = stream.select(
                    recoveryTime,
                    selectionOptions,
                    null,
                    (CharSequence[]) null
            );

            while (cursor.next()) {
                InstrumentMessage message = cursor.getMessage();
                InstrumentMessage messageClone = message.clone();

                RawMessage msg = (RawMessage) messageClone;

                if (msg != null) {
                    // handle migration messages
                    if ("@SYSTEM".equals(msg.getSymbol())) {
                        //TODO remove workaround when when subscription feature will be released
                        if (msg.type != null && SchemaChangeMessage.class.getName().equals(msg.type.getName())) {
                            SchemaChangeMessage changeMessage = boundDecode(msg);
                            migrationService.apply(changeMessage, stream.getName());
                        }
                        continue;
                    }

                    if (consumedMessages.incrementAndGet() % batchSize == 0) {
                        queue.transfer(msg);
                    } else {
                        queue.add(msg);
                    }
                } else {
                    LOG.warn().append("Null element occurred in stream: ").append(stream.getName()).commit();
                }
            }
        } finally {
            close();
        }
    }