private Map populateCollectors()

in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [129:158]


    private Map<TopicPartition, BiDirectionalRecordCollector> populateCollectors(KafkaConsumer<K, V> consumer, Map<TopicPartition, Long> offsets, long limit, FilterClausePredicate<K, V> filter, long timeoutMs, Map<TopicPartition, OffsetRange> offsetRanges) {
        Map<TopicPartition, BiDirectionalRecordCollector> collectors = initBiDirectionalRecordCollectorsForPartitions(
                offsets.keySet(), filter, limit);

        assignConsumerToPartitions(consumer, offsets);

        long fetchStart = System.currentTimeMillis();

        Map<TopicPartition, OffsetRange> currentChunkOffsetRanges = getFirstChunkRanges(offsets, offsetRanges, collectors);

        Map<TopicPartition, Long> consumedOffsets = new HashMap<>();

        while(true) {

            seekOffsetsOfCurrentChunkBeginning(consumer, currentChunkOffsetRanges);
            collectRecordsFromCurrentChunk(consumer, timeoutMs, collectors, fetchStart,
                                           currentChunkOffsetRanges, consumedOffsets);

            calculateNextChunkRange(currentChunkOffsetRanges, offsetRanges, collectors);

            if( allCurrentChunksIsEmpty(currentChunkOffsetRanges) ||
                areAllOffsetsReachedEndOfRange(consumedOffsets, offsetRanges) ||
                areAllCollectorsDone(collectors) ||
                isTimeExpired(fetchStart, timeoutMs)) {
                break;
            }

        }
        return collectors;
    }