in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [50:106]
public void start(Map<String, String> props) {
TBConnectorConfig config = new TBConnectorConfig(SOURCE_CONFIG_DEF, props);
String tbStream = config.getTBStream();
tbMessageId = config.getTBMessageIDField();
tbMessageType = config.getTBMessageType();
partition = Collections.singletonMap(PARTITION_ATTR, tbStream);
timebase = TickDBFactory.createFromUrl(config.getTBUrl(), config.getTBUser(), config.getTBPassword());
timebase.open(true);
DXTickStream msgStream = timebase.getStream(tbStream);
if (msgStream == null) {
throw new IllegalArgumentException("TimeBase stream \"" + tbStream + "\"does not exist");
}
RecordClassDescriptor msgType = null;
RecordClassDescriptor[] msgTypes = msgStream.getTypes();
if (msgTypes.length > 1) {
if (tbMessageType == null) {
throw new IllegalArgumentException("Specify type of messages to read from polymorphic TimeBase stream " + config.getTBStream());
}
msgType = getDescriptor(msgTypes, tbMessageType);
if (msgType == null) {
throw new IllegalArgumentException("TimeBase stream does not have \"" + tbMessageType + "\" message type");
}
} else if (msgTypes.length == 1) {
msgType = msgTypes[0];
if (tbMessageType != null && !tbMessageType.equals(msgType.getName())) {
throw new IllegalArgumentException("TimeBase stream does not have \"" + tbMessageType + "\" message type");
}
} else {
throw new IllegalArgumentException("TimeBase stream has no record descriptors");
}
deserializer = new RawMessageDeserializer(msgType, config);
Long lastTimestamp = 0L;
Long lastMessageOffset = null;
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
if (offset != null) {
lastTimestamp = (Long) offset.get(TIMESTAMP_ATTR);
lastMessageOffset = (Long) offset.get(OFFSET_ATTR);
}
LOG.info("Starting TBSourceTask at timestamp: " + lastTimestamp + ", offset: " + lastMessageOffset);
SelectionOptions options = new SelectionOptions(true, true);
cursor = msgStream.select(lastTimestamp, options);
lastMessageTimestamp = lastTimestamp;
lastMessageCounter = 0;
if (lastMessageOffset != null) {
resetToOffset(lastMessageOffset);
}
}