in timebase-kafka-connector/src/main/java/deltix/kafka/connect/TBSourceTask.java [219:250]
public static void main(String[] args) throws Exception {
Map<String, String> config = new HashMap<>();
config.put(TB_URL_PROP, "dxtick://localhost:8011");
config.put(TB_STREAM_PROP, "sink-test");
//config.put(TB_MESSAGE_TYPE_PROP, "deltix.timebase.api.messages.securities.Currency");
//config.put(TB_MESSAGE_ID_PROP, "currentTime");
config.put(TOPIC_PROP, "tb-orders");
TBSourceTask task = new TBSourceTask();
try {
task.initialize(new SourceTaskContext() {
@Override
public Map<String, String> configs() { return null; }
@Override
public OffsetStorageReader offsetStorageReader() {
return new OffsetStorageReader() {
@Override
public <T> Map<String, Object> offset(Map<String, T> map) { return null;}
@Override
public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> collection) { return null;}
};
}
});
task.start(config);
for (List<SourceRecord> records = task.poll(); records != null; records = task.poll()) {
System.out.println(records.get(0));
}
}
finally {
task.stop();
}
}