private void schedulePeriodicTasks()

in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/Lead.java [121:171]


    private void schedulePeriodicTasks(
        Reference<Long> storedCommitted,
        long mainPingCheckPeriod,
        long consumerPingCheckPeriod,
        final GapDetectionStrategy gapDetectionStrategy
    ) {
        schedulePeriodicTask(
            new DenseCommittedIdSavePeriodicTask(planningState, storedCommitted),
            LAST_DENSE_COMMITTED_SAVE_INTERVAL
        );
        schedulePeriodicTask(new MainHeartbeatPeriodicTask(this, manager), mainPingCheckPeriod);
        schedulePeriodicTask(new ConsumerPingCheckPeriodicTask(pingManager), consumerPingCheckPeriod);
        simpleTask(new Runnable() {
            @Override
            public void run() {
                RPCService main = manager.main();
                if (main != null && planningState.isReconciliationGoing() && planner.reconciliationFinishedConditionMet()) {
                    main.get(ActiveCacheStoreService.class, ActiveCacheStoreService.NAME).stopReconciliation(clusterId);
                    planningState.notifyReconcilliationStopped();
                    resumePausedConsumers();
                    LOGGER.info("[L] Reconciliation stopped");
                }
                if (!contextLoaded && loader.isContextLoaded()) {
                    planner.updateContext(loader.getLastDenseCommitted(), loader.getSparseCommitted());
                    loader.stopAll();
                    contextLoaded = true;
                    LOGGER.info("[L] Lead has been loaded");
                }
                if (!planningState.isReconciliationGoing() && gapDetectionStrategy.gapDetected(planningState)) {
                    LOGGER.info("[L] Lead detected gap, starting reconciliation");
                    callReconciliation(true);
                }
            }
        });
        simpleTask(new Runnable() {
            @Override
            public void run() {
                Map<UUID, LeadResponse> ready = planner.plan();
                for (Map.Entry<UUID, LeadResponse> entry : ready.entrySet()) {
                    LeadResponse availableTxs = availableWorkBuffer.remove(entry.getKey());
                    LongList toCommitIds = entry.getValue().getToCommitIds();
                    if (toCommitIds != null) {
                        LOGGER.debug("[L] Plan you {} on {}", toCommitIds, f(entry.getKey()));
                        planner.markInProgressTransactions(toCommitIds);
                    }
                    availableTxs = availableTxs == null ? entry.getValue() : availableTxs.add(entry.getValue());
                    availableWorkBuffer.put(entry.getKey(), availableTxs);
                }
            }
        });
    }