public Map calculateChangedOffsets()

in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/OffsetCalculator.java [49:68]


    public Map<TopicPartition, OffsetAndMetadata> calculateChangedOffsets(List<List<TransactionWrapper>> txToCommit) {
        if (txToCommit.isEmpty()) {
            return Collections.emptyMap();
        }
        Lazy<TopicPartition, MutableLongList> offsetsFromTransactions = calculateOffsetsFromTransactions(txToCommit);
        Collection<TopicPartition> allTopics = new HashSet<>(offsets.keySet());
        allTopics.addAll(offsetsFromTransactions.keySet());
        Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
        for (TopicPartition topic : allTopics) {
            OffsetHolder offsetHolder = offsets.get(topic);
            long currentOffset = offsetHolder.getLastDenseOffset();
            long updatedOffset = MergeHelper.mergeWithDenseCompaction(offsetsFromTransactions.get(topic),
                offsetHolder.getSparseCommittedOffsets(), currentOffset);
            if (updatedOffset != INITIAL_SYNC_POINT && updatedOffset != currentOffset) {
                offsetHolder.setLastDenseOffset(updatedOffset);
                result.put(topic, new OffsetAndMetadata(updatedOffset));
            }
        }
        return result;
    }