in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [129:158]
private Map<TopicPartition, BiDirectionalRecordCollector> populateCollectors(KafkaConsumer<K, V> consumer, Map<TopicPartition, Long> offsets, long limit, FilterClausePredicate<K, V> filter, long timeoutMs, Map<TopicPartition, OffsetRange> offsetRanges) {
Map<TopicPartition, BiDirectionalRecordCollector> collectors = initBiDirectionalRecordCollectorsForPartitions(
offsets.keySet(), filter, limit);
assignConsumerToPartitions(consumer, offsets);
long fetchStart = System.currentTimeMillis();
Map<TopicPartition, OffsetRange> currentChunkOffsetRanges = getFirstChunkRanges(offsets, offsetRanges, collectors);
Map<TopicPartition, Long> consumedOffsets = new HashMap<>();
while(true) {
seekOffsetsOfCurrentChunkBeginning(consumer, currentChunkOffsetRanges);
collectRecordsFromCurrentChunk(consumer, timeoutMs, collectors, fetchStart,
currentChunkOffsetRanges, consumedOffsets);
calculateNextChunkRange(currentChunkOffsetRanges, offsetRanges, collectors);
if( allCurrentChunksIsEmpty(currentChunkOffsetRanges) ||
areAllOffsetsReachedEndOfRange(consumedOffsets, offsetRanges) ||
areAllCollectorsDone(collectors) ||
isTimeExpired(fetchStart, timeoutMs)) {
break;
}
}
return collectors;
}