in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/Lead.java [121:171]
private void schedulePeriodicTasks(
Reference<Long> storedCommitted,
long mainPingCheckPeriod,
long consumerPingCheckPeriod,
final GapDetectionStrategy gapDetectionStrategy
) {
schedulePeriodicTask(
new DenseCommittedIdSavePeriodicTask(planningState, storedCommitted),
LAST_DENSE_COMMITTED_SAVE_INTERVAL
);
schedulePeriodicTask(new MainHeartbeatPeriodicTask(this, manager), mainPingCheckPeriod);
schedulePeriodicTask(new ConsumerPingCheckPeriodicTask(pingManager), consumerPingCheckPeriod);
simpleTask(new Runnable() {
@Override
public void run() {
RPCService main = manager.main();
if (main != null && planningState.isReconciliationGoing() && planner.reconciliationFinishedConditionMet()) {
main.get(ActiveCacheStoreService.class, ActiveCacheStoreService.NAME).stopReconciliation(clusterId);
planningState.notifyReconcilliationStopped();
resumePausedConsumers();
LOGGER.info("[L] Reconciliation stopped");
}
if (!contextLoaded && loader.isContextLoaded()) {
planner.updateContext(loader.getLastDenseCommitted(), loader.getSparseCommitted());
loader.stopAll();
contextLoaded = true;
LOGGER.info("[L] Lead has been loaded");
}
if (!planningState.isReconciliationGoing() && gapDetectionStrategy.gapDetected(planningState)) {
LOGGER.info("[L] Lead detected gap, starting reconciliation");
callReconciliation(true);
}
}
});
simpleTask(new Runnable() {
@Override
public void run() {
Map<UUID, LeadResponse> ready = planner.plan();
for (Map.Entry<UUID, LeadResponse> entry : ready.entrySet()) {
LeadResponse availableTxs = availableWorkBuffer.remove(entry.getKey());
LongList toCommitIds = entry.getValue().getToCommitIds();
if (toCommitIds != null) {
LOGGER.debug("[L] Plan you {} on {}", toCommitIds, f(entry.getKey()));
planner.markInProgressTransactions(toCommitIds);
}
availableTxs = availableTxs == null ? entry.getValue() : availableTxs.add(entry.getValue());
availableWorkBuffer.put(entry.getKey(), availableTxs);
}
}
});
}