in src/main/java/com/epam/eco/commons/kafka/helpers/CachedTopicRecordFetcher.java [92:112]
public RecordFetchResult<K, V> fetchByTimestamps(Map<TopicPartition, Long> timestamps,
long limit,
FilterClausePredicate<K,V> filter,
long timeoutInMs,
FetchDirection direction) {
Map<TopicPartition, Long> limitsPerPartition = calculateLimitsByPartition(
timestamps.keySet(), limit);
populateCacheForAllPartitions(timestamps, timeoutInMs);
Map<TopicPartition, PartitionRecordFetchResult<K, V>> resultMap = new HashMap<>();
timestamps.keySet().forEach(topicPartition ->
resultMap.put(topicPartition,
buildFetchResultByTimestamp(topicPartition,
timestamps.get(topicPartition),
limitsPerPartition.get( topicPartition),
direction)));
return new RecordFetchResult<>(resultMap);
}