in src/main/java/com/epam/eco/commons/kafka/helpers/CachedTopicRecordFetcher.java [69:89]
public RecordFetchResult<K, V> fetchByOffsets(Map<TopicPartition, Long> offsets,
long limit,
FilterClausePredicate<K,V> filter,
long timeoutInMs,
FetchDirection fetchDirection) {
populateCacheForAllPartitions(offsets, timeoutInMs);
Map<TopicPartition, Long> limitsPerPartition = calculateLimitsByPartition(offsets.keySet(),
limit);
Map<TopicPartition, PartitionRecordFetchResult<K, V>> resultMap = new HashMap<>();
offsets.keySet().forEach(topicPartition ->
resultMap.put(topicPartition,
buildFetchResult(topicPartition,
offsets.get(topicPartition),
limitsPerPartition.get(topicPartition),
fetchDirection)));
return new RecordFetchResult<>(resultMap);
}