in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/Lead.java [78:119]
public Lead(
Ignite ignite,
@Named(DataCapturerBusConfiguration.CLUSTER_ID) UUID clusterId,
DataRecoveryConfig dataRecoveryConfig,
RPCManager manager,
KafkaFactory kafkaFactory,
GapDetectionStrategy gapDetectionStrategy,
PublisherKafkaService kafkaService,
LeadPlanningState planningState,
@Named(MAIN_PING_CHECK_KEY) Long mainPingCheckPeriod,
@Named(CONSUMER_PING_CHECK_KEY) Long consumerPingCheckPeriod,
ConsumerPingCheckStrategy strategy,
ClusterGroupService clusterGroupService
) {
this.ignite = ignite;
this.clusterId = clusterId;
this.dataRecoveryConfig = dataRecoveryConfig;
this.manager = manager;
this.kafkaFactory = kafkaFactory;
this.planningState = planningState;
leadId = UUID.randomUUID();
availableWorkBuffer = new ConcurrentHashMap<>();
planner = new LeadPlanner(planningState);
pingManager = new ConsumerPingManager(strategy, availableWorkBuffer, planner);
Reference<Long> storedCommitted = AtomicsHelper.getReference(ignite, LAST_DENSE_COMMITTED_KEY, false);
storedCommitted.initIfAbsent(LeadContextLoader.NOT_LOADED);
loader = new LeadContextLoader(ignite, kafkaFactory, leadId, dataRecoveryConfig, storedCommitted,
clusterGroupService, kafkaService);
if (mainPingCheckPeriod == null) {
mainPingCheckPeriod = PING_CHECK_PERIOD;
}
if (consumerPingCheckPeriod == null) {
consumerPingCheckPeriod = PING_CHECK_PERIOD;
}
schedulePeriodicTasks(storedCommitted, mainPingCheckPeriod, consumerPingCheckPeriod, gapDetectionStrategy);
LOGGER.info("[L] New lead created with id {}{}", f(leadId), isReconciliationGoing() ? ", reconciliation is going" : "");
}