in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/SubscriberConsumer.java [108:138]
public void execute() {
simpleTask(new Runnable() {
@Override
public void run() {
pollTransaction();
}
});
OffsetCalculator offsetCalculator = new OffsetCalculator();
schedulePeriodicTask(new CommitOffsetsPeriodicTask(buffer, offsetCalculator, consumer),
COMMIT_OFFSETS_PERIOD);
schedulePeriodicTask(new BufferOverflowConditionPeriodicTask(this, buffer,
bufferOverflowCondition, lead), COMMIT_OFFSETS_PERIOD);
schedulePeriodicTask(new Runnable() {
@Override
public void run() {
buffer.compact(lead.getLastDenseCommittedId());
}
}, POLL_CYCLES_BETWEEN_COMPACTION);
LOGGER.info("[C] Started polling kafka for messages");
consumer.subscribe(Arrays.asList(remoteTopic, reconciliationTopic),
new RebalanceListener(buffer, offsetCalculator, consumer));
try {
super.execute();
}
finally {
LOGGER.info("[C] Ended polling kafka for messages");
consumer.close();
doneNotifier.close();
}
}