in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [230:259]
protected RecordFetchResult<K, V> doFetchByOffsets(
KafkaConsumer<K, V> consumer,
Map<TopicPartition, Long> offsets,
long limit,
FilterClausePredicate<K,V> filter,
long timeoutMs) {
if (offsets.isEmpty()) {
return RecordFetchResult.emptyResult();
}
Map<TopicPartition, OffsetRange> offsetRanges = fetchOffsetRanges(offsets.keySet());
Map<TopicPartition, Long> actualOffsets = filterOutActualPartitions(offsets, offsetRanges);
Map<TopicPartition, Long> exhaustedOffsets = offsets.keySet().stream()
.filter(key->!actualOffsets.containsKey(key))
.collect(Collectors.toMap(Function.identity(), offsets::get));
if (actualOffsets.isEmpty()) {
return RecordFetchResult.emptyResult();
}
Map<TopicPartition, RecordCollector> collectors = populateCollectors(
consumer, actualOffsets, limit, filter, timeoutMs, offsetRanges);
exhaustedOffsets.keySet().forEach(topicPartition->
collectors.put(topicPartition, new RecordCollector(filter,0L))
);
return toFetchResult(collectors, offsetRanges, OffsetRange::getLargest);
}