in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [193:219]
private void collectRecordsFromCurrentChunk(KafkaConsumer<K, V> consumer,
long timeoutMs,
Map<TopicPartition, BiDirectionalRecordCollector> collectors,
long fetchStart,
Map<TopicPartition, OffsetRange> currentChunkOffsets,
Map<TopicPartition, Long> consumedOffsets) {
consumedOffsets.putAll(KafkaUtils.getConsumerPositions(consumer));
while(true) {
ConsumerRecords<K, V> records = consumer.poll(POLL_TIMEOUT);
if(records.count() > 0) {
collectLastLimitRecords(records, collectors, currentChunkOffsets);
if( areAllCollectorsDone(collectors) ||
areAllOffsetsReachedEndOfRange(consumedOffsets, currentChunkOffsets) ||
isTimeExpired(fetchStart, timeoutMs)) {
break;
}
}
if(isTimeExpired(fetchStart,timeoutMs)) {
break;
}
}
}