in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [105:127]
protected RecordFetchResult<K, V> doReverseFetchByOffsets(KafkaConsumer<K, V> consumer, Map<TopicPartition, Long> offsets, long limit, FilterClausePredicate<K, V> filter, long timeoutMs) {
if(offsets.isEmpty()) {
return RecordFetchResult.emptyResult();
}
Map<TopicPartition, OffsetRange> offsetRanges = fetchOffsetRanges(offsets.keySet());
Map<TopicPartition, Long> actualOffsets = filterActiveOffsetsReverseFetch(offsets, offsetRanges);
Map<TopicPartition, Long> exhaustedOffsets = offsets.keySet().stream().filter(
key -> ! actualOffsets.containsKey(key)).collect(Collectors.toMap(Function.identity(), offsets::get));
if(actualOffsets.isEmpty()) {
return RecordFetchResult.emptyResult();
}
Map<TopicPartition, BiDirectionalRecordCollector> collectors = populateCollectors(consumer, actualOffsets,
limit, filter, timeoutMs,
offsetRanges);
collectors.values().forEach(BiDirectionalRecordCollector::sortByOffsets);
exhaustedOffsets.keySet().forEach(
topicPartition -> collectors.put(topicPartition, new BiDirectionalRecordCollector(filter, 0L)));
return toFetchResult(collectors, offsetRanges, OffsetRange::getSmallest);
}