protected RecordFetchResult doFetchByOffsets()

in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [230:259]


    protected RecordFetchResult<K, V> doFetchByOffsets(
            KafkaConsumer<K, V> consumer,
            Map<TopicPartition, Long> offsets,
            long limit,
            FilterClausePredicate<K,V> filter,
            long timeoutMs) {
        if (offsets.isEmpty()) {
            return RecordFetchResult.emptyResult();
        }

        Map<TopicPartition, OffsetRange> offsetRanges = fetchOffsetRanges(offsets.keySet());

        Map<TopicPartition, Long> actualOffsets = filterOutActualPartitions(offsets, offsetRanges);
        Map<TopicPartition, Long> exhaustedOffsets = offsets.keySet().stream()
                                                            .filter(key->!actualOffsets.containsKey(key))
                                                            .collect(Collectors.toMap(Function.identity(), offsets::get));

        if (actualOffsets.isEmpty()) {
            return RecordFetchResult.emptyResult();
        }

        Map<TopicPartition, RecordCollector> collectors = populateCollectors(
                consumer, actualOffsets, limit, filter, timeoutMs, offsetRanges);

        exhaustedOffsets.keySet().forEach(topicPartition->
            collectors.put(topicPartition, new RecordCollector(filter,0L))
        );

        return toFetchResult(collectors, offsetRanges, OffsetRange::getLargest);
    }