in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/LocalLeadContextLoader.java [95:118]
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) {
ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);
if (records.isEmpty()) {
if (!stalled && checkStalled(consumer)) {
LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId));
stalled = true;
lead.notifyLocalLoaderStalled(leadId, localLoaderId);
}
// ToDo: Consider sending empty messages for heartbeat sake.
return;
}
if (stalled) {
stalled = false;
}
MutableLongList committedIds = new LongArrayList(records.count());
for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
committedIds.add(record.timestamp());
}
committedIds.sortThis();
lead.updateInitialContext(localLoaderId, committedIds);
consumer.commitSync();
}