in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/Lead.java [270:282]
private void callReconciliation(boolean gapFound) {
boolean servicesOk = !kafkaIsOutOfOrder && !mainIsOutOfOrder;
if (servicesOk && !planningState.isReconciliationGoing()) {
RPCService main = manager.main();
if (main != null) {
LOGGER.info("[L] Reconciliation started");
ActiveCacheStoreService service = main.get(ActiveCacheStoreService.class, ActiveCacheStoreService.NAME);
long endTxId = gapFound ? planningState.getLastReadTxId() : service.lastProcessedTxId();
service.startReconciliation(clusterId, planningState.lastDenseCommitted(), endTxId);
planningState.notifyReconcilliationStarted(endTxId);
}
}
}