in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [233:259]
private void calculateNextChunkRange(Map<TopicPartition, OffsetRange> currentChunkOffsets,
Map<TopicPartition, OffsetRange> offsetRanges,
Map<TopicPartition, BiDirectionalRecordCollector> collectors) {
currentChunkOffsets.keySet().forEach(topicPartition -> {
long largestOffset = currentChunkOffsets.get(topicPartition).getSmallest();
boolean smallestOffsetInclusive = true;
boolean largestOffsetInclusive = true;
long lowerBound = offsetRanges.get(topicPartition).getSmallest();
long smallestOffset = largestOffset - collectors.get(topicPartition).getLimit();
if(smallestOffset < lowerBound) {
smallestOffset = lowerBound;
smallestOffsetInclusive = false;
}
if(largestOffset <= offsetRanges.get(topicPartition).getSmallest()) {
largestOffset = offsetRanges.get(topicPartition).getSmallest();
largestOffsetInclusive = false;
smallestOffset = lowerBound;
smallestOffsetInclusive = false;
}
currentChunkOffsets.put(topicPartition,
OffsetRange.with(smallestOffset, smallestOffsetInclusive, largestOffset, largestOffsetInclusive));
});
}