public void execute()

in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/SubscriberConsumer.java [108:138]


    public void execute() {
        simpleTask(new Runnable() {
            @Override
            public void run() {
                pollTransaction();
            }
        });
        OffsetCalculator offsetCalculator = new OffsetCalculator();
        schedulePeriodicTask(new CommitOffsetsPeriodicTask(buffer, offsetCalculator, consumer),
            COMMIT_OFFSETS_PERIOD);
        schedulePeriodicTask(new BufferOverflowConditionPeriodicTask(this, buffer,
                bufferOverflowCondition, lead), COMMIT_OFFSETS_PERIOD);
        schedulePeriodicTask(new Runnable() {
            @Override
            public void run() {
                buffer.compact(lead.getLastDenseCommittedId());
            }
        }, POLL_CYCLES_BETWEEN_COMPACTION);

        LOGGER.info("[C] Started polling kafka for messages");
        consumer.subscribe(Arrays.asList(remoteTopic, reconciliationTopic),
            new RebalanceListener(buffer, offsetCalculator, consumer));
        try {
            super.execute();
        }
        finally {
            LOGGER.info("[C] Ended polling kafka for messages");
            consumer.close();
            doneNotifier.close();
        }
    }