in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/OffsetCalculator.java [49:68]
public Map<TopicPartition, OffsetAndMetadata> calculateChangedOffsets(List<List<TransactionWrapper>> txToCommit) {
if (txToCommit.isEmpty()) {
return Collections.emptyMap();
}
Lazy<TopicPartition, MutableLongList> offsetsFromTransactions = calculateOffsetsFromTransactions(txToCommit);
Collection<TopicPartition> allTopics = new HashSet<>(offsets.keySet());
allTopics.addAll(offsetsFromTransactions.keySet());
Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
for (TopicPartition topic : allTopics) {
OffsetHolder offsetHolder = offsets.get(topic);
long currentOffset = offsetHolder.getLastDenseOffset();
long updatedOffset = MergeHelper.mergeWithDenseCompaction(offsetsFromTransactions.get(topic),
offsetHolder.getSparseCommittedOffsets(), currentOffset);
if (updatedOffset != INITIAL_SYNC_POINT && updatedOffset != currentOffset) {
offsetHolder.setLastDenseOffset(updatedOffset);
result.put(topic, new OffsetAndMetadata(updatedOffset));
}
}
return result;
}