private void pollTransaction()

in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/SubscriberConsumer.java [140:178]


    private void pollTransaction() {
        ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);
        List<TransactionMetadata> metadatas;

        if (metadataResendRequested) {
            metadataResendRequested = false;
            metadatas = new ArrayList<>(buffer.size() + records.count());
            for (TransactionWrapper wrapper : buffer.getUncommittedTxs()) {
                metadatas.add(wrapper.deserializedMetadata());
            }
            LOGGER.info("[C] Resending metadata {} from {}", metadatas.size(), f(consumerId));
        }
        else {
            metadatas = new ArrayList<>(records.count());
        }
        for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
            TransactionMetadata metadata = serializer.deserialize(record.key());
            TransactionWrapper wrapper = new TransactionWrapper(record, metadata);

            metadatas.add(metadata);
            buffer.add(wrapper);
        }
        Collections.sort(metadatas, COMPARATOR);
        if (!metadatas.isEmpty()) {
            LOGGER.debug("[C] Before send {} from {}", metadatas, f(consumerId));
        }
        LeadResponse response = lead.notifyTransactionsRead(consumerId, metadatas);
        if (response != null) {
            if (response.getAlreadyProcessedIds() != null) {
                buffer.markAlreadyCommitted(response.getAlreadyProcessedIds());
                LOGGER.debug("[C] Remove {} in {}", response.getAlreadyProcessedIds(), f(consumerId));
            }
            if (response.getToCommitIds() != null) {
                LOGGER.debug("[C] Gotcha {} in {}", response.getToCommitIds(), f(consumerId));
                buffer.markInProgress(response.getToCommitIds());
                committer.commitAsync(response.getToCommitIds(), deserializerClosure, doneNotifier, fullCommitHandler);
            }
        }
    }