public RecordFetchResult fetchByOffsets()

in src/main/java/com/epam/eco/commons/kafka/helpers/CachedTopicRecordFetcher.java [69:89]


    public RecordFetchResult<K, V> fetchByOffsets(Map<TopicPartition, Long> offsets,
                                                  long limit,
                                                  FilterClausePredicate<K,V> filter,
                                                  long timeoutInMs,
                                                  FetchDirection fetchDirection) {

        populateCacheForAllPartitions(offsets, timeoutInMs);

        Map<TopicPartition, Long> limitsPerPartition = calculateLimitsByPartition(offsets.keySet(),
                                                                                  limit);

        Map<TopicPartition, PartitionRecordFetchResult<K, V>> resultMap = new HashMap<>();
        offsets.keySet().forEach(topicPartition ->
                                         resultMap.put(topicPartition,
                                                       buildFetchResult(topicPartition,
                                                                        offsets.get(topicPartition),
                                                                        limitsPerPartition.get(topicPartition),
                                                                        fetchDirection)));

        return new RecordFetchResult<>(resultMap);
    }