protected RecordFetchResult doReverseFetchByOffsets()

in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [105:127]


    protected RecordFetchResult<K, V> doReverseFetchByOffsets(KafkaConsumer<K, V> consumer, Map<TopicPartition, Long> offsets, long limit, FilterClausePredicate<K, V> filter, long timeoutMs) {
        if(offsets.isEmpty()) {
            return RecordFetchResult.emptyResult();
        }

        Map<TopicPartition, OffsetRange> offsetRanges = fetchOffsetRanges(offsets.keySet());

        Map<TopicPartition, Long> actualOffsets = filterActiveOffsetsReverseFetch(offsets, offsetRanges);
        Map<TopicPartition, Long> exhaustedOffsets = offsets.keySet().stream().filter(
                key -> ! actualOffsets.containsKey(key)).collect(Collectors.toMap(Function.identity(), offsets::get));
        if(actualOffsets.isEmpty()) {
            return RecordFetchResult.emptyResult();
        }

        Map<TopicPartition, BiDirectionalRecordCollector> collectors = populateCollectors(consumer, actualOffsets,
                                                                                          limit, filter, timeoutMs,
                                                                                          offsetRanges);
        collectors.values().forEach(BiDirectionalRecordCollector::sortByOffsets);

        exhaustedOffsets.keySet().forEach(
                topicPartition -> collectors.put(topicPartition, new BiDirectionalRecordCollector(filter, 0L)));
        return toFetchResult(collectors, offsetRanges, OffsetRange::getSmallest);
    }