in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timebase/RawMessageDataFeeder.java [114:161]
private void fetchDataStream() throws InterruptedException {
if (stream == null) {
LOG.error()
.append("Stream could not be NULL.")
.commit();
return;
}
try {
SelectionOptions selectionOptions = new SelectionOptions(true, true, ChannelQualityOfService.MAX_THROUGHPUT);
selectionOptions.versionTracking = true;
cursor = stream.select(
recoveryTime,
selectionOptions,
null,
(CharSequence[]) null
);
while (cursor.next()) {
InstrumentMessage message = cursor.getMessage();
InstrumentMessage messageClone = message.clone();
RawMessage msg = (RawMessage) messageClone;
if (msg != null) {
// handle migration messages
if ("@SYSTEM".equals(msg.getSymbol())) {
//TODO remove workaround when when subscription feature will be released
if (msg.type != null && SchemaChangeMessage.class.getName().equals(msg.type.getName())) {
SchemaChangeMessage changeMessage = boundDecode(msg);
migrationService.apply(changeMessage, stream.getName());
}
continue;
}
if (consumedMessages.incrementAndGet() % batchSize == 0) {
queue.transfer(msg);
} else {
queue.add(msg);
}
} else {
LOG.warn().append("Null element occurred in stream: ").append(stream.getName()).commit();
}
}
} finally {
close();
}
}