private void collectRecordsFromCurrentChunk()

in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [193:219]


    private void collectRecordsFromCurrentChunk(KafkaConsumer<K, V> consumer,
                                                long timeoutMs,
                                                Map<TopicPartition, BiDirectionalRecordCollector> collectors,
                                                long fetchStart,
                                                Map<TopicPartition, OffsetRange> currentChunkOffsets,
                                                Map<TopicPartition, Long> consumedOffsets) {
        consumedOffsets.putAll(KafkaUtils.getConsumerPositions(consumer));
        while(true) {
            ConsumerRecords<K, V> records = consumer.poll(POLL_TIMEOUT);

            if(records.count() > 0) {

                collectLastLimitRecords(records, collectors, currentChunkOffsets);

                if( areAllCollectorsDone(collectors) ||
                    areAllOffsetsReachedEndOfRange(consumedOffsets, currentChunkOffsets) ||
                    isTimeExpired(fetchStart, timeoutMs)) {
                    break;
                }
            }

            if(isTimeExpired(fetchStart,timeoutMs)) {
                break;
            }
        }

    }