public Lead()

in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/Lead.java [78:119]


    public Lead(
        Ignite ignite,
        @Named(DataCapturerBusConfiguration.CLUSTER_ID) UUID clusterId,
        DataRecoveryConfig dataRecoveryConfig,
        RPCManager manager,
        KafkaFactory kafkaFactory,
        GapDetectionStrategy gapDetectionStrategy,
        PublisherKafkaService kafkaService,
        LeadPlanningState planningState,
        @Named(MAIN_PING_CHECK_KEY) Long mainPingCheckPeriod,
        @Named(CONSUMER_PING_CHECK_KEY) Long consumerPingCheckPeriod,
        ConsumerPingCheckStrategy strategy,
        ClusterGroupService clusterGroupService
    ) {
        this.ignite = ignite;
        this.clusterId = clusterId;
        this.dataRecoveryConfig = dataRecoveryConfig;
        this.manager = manager;
        this.kafkaFactory = kafkaFactory;
        this.planningState = planningState;

        leadId = UUID.randomUUID();
        availableWorkBuffer = new ConcurrentHashMap<>();
        planner = new LeadPlanner(planningState);
        pingManager = new ConsumerPingManager(strategy, availableWorkBuffer, planner);

        Reference<Long> storedCommitted = AtomicsHelper.getReference(ignite, LAST_DENSE_COMMITTED_KEY, false);
        storedCommitted.initIfAbsent(LeadContextLoader.NOT_LOADED);

        loader = new LeadContextLoader(ignite, kafkaFactory, leadId, dataRecoveryConfig, storedCommitted,
            clusterGroupService, kafkaService);

        if (mainPingCheckPeriod == null) {
            mainPingCheckPeriod = PING_CHECK_PERIOD;
        }
        if (consumerPingCheckPeriod == null) {
            consumerPingCheckPeriod = PING_CHECK_PERIOD;
        }
        schedulePeriodicTasks(storedCommitted, mainPingCheckPeriod, consumerPingCheckPeriod, gapDetectionStrategy);

        LOGGER.info("[L] New lead created with id {}{}", f(leadId), isReconciliationGoing() ? ", reconciliation is going" : "");
    }