public void start()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [50:106]


    public void start(Map<String, String> props) {
        TBConnectorConfig config = new TBConnectorConfig(SOURCE_CONFIG_DEF, props);

        String tbStream = config.getTBStream();
        tbMessageId = config.getTBMessageIDField();
        tbMessageType = config.getTBMessageType();

        partition = Collections.singletonMap(PARTITION_ATTR, tbStream);

        timebase = TickDBFactory.createFromUrl(config.getTBUrl(), config.getTBUser(), config.getTBPassword());
        timebase.open(true);

        DXTickStream msgStream = timebase.getStream(tbStream);
        if (msgStream == null) {
            throw new IllegalArgumentException("TimeBase stream \"" + tbStream + "\"does not exist");
        }
        RecordClassDescriptor msgType = null;
        RecordClassDescriptor[] msgTypes = msgStream.getTypes();
        if (msgTypes.length > 1) {
            if (tbMessageType == null) {
                throw new IllegalArgumentException("Specify type of messages to read from polymorphic TimeBase stream " + config.getTBStream());
            }
            msgType = getDescriptor(msgTypes, tbMessageType);
            if (msgType == null) {
                throw new IllegalArgumentException("TimeBase stream does not have \"" + tbMessageType + "\" message type");
            }
        } else if (msgTypes.length == 1) {
            msgType = msgTypes[0];
            if (tbMessageType != null && !tbMessageType.equals(msgType.getName())) {
                throw new IllegalArgumentException("TimeBase stream does not have \"" + tbMessageType + "\" message type");
            }
        } else {
            throw new IllegalArgumentException("TimeBase stream has no record descriptors");
        }

        deserializer = new RawMessageDeserializer(msgType, config);

        Long lastTimestamp = 0L;
        Long lastMessageOffset = null;
        Map<String, Object> offset = context.offsetStorageReader().offset(partition);

        if (offset != null) {
            lastTimestamp = (Long) offset.get(TIMESTAMP_ATTR);
            lastMessageOffset = (Long) offset.get(OFFSET_ATTR);
        }
        LOG.info("Starting TBSourceTask at timestamp: " + lastTimestamp + ", offset: " + lastMessageOffset);

        SelectionOptions options = new SelectionOptions(true, true);
        cursor = msgStream.select(lastTimestamp, options);

        lastMessageTimestamp = lastTimestamp;
        lastMessageCounter = 0;

        if (lastMessageOffset != null) {
            resetToOffset(lastMessageOffset);
        }
    }