private void calculateNextChunkRange()

in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [233:259]


    private void calculateNextChunkRange(Map<TopicPartition, OffsetRange> currentChunkOffsets,
                                   Map<TopicPartition, OffsetRange> offsetRanges,
                                   Map<TopicPartition, BiDirectionalRecordCollector> collectors) {
        currentChunkOffsets.keySet().forEach(topicPartition -> {

            long largestOffset = currentChunkOffsets.get(topicPartition).getSmallest();
            boolean smallestOffsetInclusive = true;
            boolean largestOffsetInclusive = true;

            long lowerBound = offsetRanges.get(topicPartition).getSmallest();
            long smallestOffset = largestOffset - collectors.get(topicPartition).getLimit();
            if(smallestOffset < lowerBound) {
                smallestOffset = lowerBound;
                smallestOffsetInclusive = false;
            }

            if(largestOffset <= offsetRanges.get(topicPartition).getSmallest()) {
                largestOffset = offsetRanges.get(topicPartition).getSmallest();
                largestOffsetInclusive = false;
                smallestOffset = lowerBound;
                smallestOffsetInclusive = false;
            }

            currentChunkOffsets.put(topicPartition,
                    OffsetRange.with(smallestOffset, smallestOffsetInclusive, largestOffset, largestOffsetInclusive));
        });
    }