in src/main/java/com/epam/eco/commons/kafka/helpers/TopicOffsetRangeFetcher.java [119:138]
protected static Map<TopicPartition, OffsetRange> doFetch(
Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
Map<TopicPartition, OffsetRange> offsets = new TreeMap<>(TopicPartitionComparator.INSTANCE);
for (TopicPartition partition : partitions) {
long offsetAtBeginning = beginningOffsets.get(partition);
long offsetAtEnd = endOffsets.get(partition);
offsets.put(
partition,
OffsetRange.with(
offsetAtBeginning,
offsetAtEnd > offsetAtBeginning,
offsetAtEnd > offsetAtBeginning ? offsetAtEnd - 1 : offsetAtEnd,
offsetAtEnd > offsetAtBeginning));
}
return offsets;
}