in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [261:299]
private Map<TopicPartition, RecordCollector> populateCollectors(KafkaConsumer<K, V> consumer,
Map<TopicPartition, Long> offsets,
long limit, FilterClausePredicate<K, V> filter,
long timeoutMs, Map<TopicPartition,
OffsetRange> offsetRanges) {
List<TopicPartition> partitionsAtBeginning = partitionsAtBeginning(offsets, offsetRanges);
Map<TopicPartition, RecordCollector> collectors = initRecordCollectorsForPartitions(
offsets.keySet(), filter, limit);
assignConsumerToPartitionsAndSeekOffsets(consumer, offsets);
long fetchStart = System.currentTimeMillis();
Map<TopicPartition, Long> consumedOffsets = new HashMap<>();
while (true) {
ConsumerRecords<K, V> records = consumer.poll(POLL_TIMEOUT);
if (records.count() > 0) {
collectRecords(records, collectors);
consumedOffsets.putAll(KafkaUtils.getConsumerPositions(consumer));
if ( areAllCollectorsDone(collectors) ||
areAllOffsetsReachedEndOfRange(consumedOffsets, offsetRanges)) {
break;
}
}
if (isTimeExpired(fetchStart, timeoutMs)) {
break;
}
}
if(!partitionsAtBeginning.isEmpty()) {
correctSmallestBoundsOfOffsetRanges(partitionsAtBeginning, collectors, offsetRanges);
}
return collectors;
}