in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [313:331]
protected void correctSmallestBoundsOfOffsetRanges(List<TopicPartition> partitions,
Map<TopicPartition, RecordCollector> collectors,
Map<TopicPartition, OffsetRange> offsetRanges) {
partitions.forEach(topicPartition -> {
long smallestOffset = collectors.get(topicPartition).getSmallestScannedOffset();
long smallestBound = offsetRanges.get(topicPartition).getSmallest();
if(smallestOffset >= 0 && smallestBound > smallestOffset) {
OffsetRange offsetRange = offsetRanges.get(topicPartition);
offsetRanges.put(topicPartition,
new OffsetRange( smallestOffset,
true,
offsetRange.getLargest(),
offsetRange.isLargestInclusive()
));
}
});
}