private Map populateCollectors()

in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [261:299]


    private Map<TopicPartition, RecordCollector> populateCollectors(KafkaConsumer<K, V> consumer,
                                                                    Map<TopicPartition, Long> offsets,
                                                                    long limit, FilterClausePredicate<K, V> filter,
                                                                    long timeoutMs, Map<TopicPartition,
                                                                    OffsetRange> offsetRanges) {

        List<TopicPartition> partitionsAtBeginning = partitionsAtBeginning(offsets, offsetRanges);

        Map<TopicPartition, RecordCollector> collectors = initRecordCollectorsForPartitions(
                offsets.keySet(), filter, limit);

        assignConsumerToPartitionsAndSeekOffsets(consumer, offsets);

        long fetchStart = System.currentTimeMillis();

        Map<TopicPartition, Long> consumedOffsets = new HashMap<>();
        while (true) {
            ConsumerRecords<K, V> records = consumer.poll(POLL_TIMEOUT);
            if (records.count() > 0) {
                collectRecords(records, collectors);

                consumedOffsets.putAll(KafkaUtils.getConsumerPositions(consumer));

                if ( areAllCollectorsDone(collectors) ||
                     areAllOffsetsReachedEndOfRange(consumedOffsets, offsetRanges)) {
                    break;
                }
            }

            if (isTimeExpired(fetchStart, timeoutMs)) {
                break;
            }
        }

        if(!partitionsAtBeginning.isEmpty()) {
            correctSmallestBoundsOfOffsetRanges(partitionsAtBeginning, collectors, offsetRanges);
        }
        return collectors;
    }