in src/main/java/com/epam/eco/commons/kafka/helpers/TopicRecordFetcher.java [378:397]
private boolean collectRecords(
ConsumerRecords<K, V> records,
Map<TopicPartition, RecordCollector> collectors) {
boolean anythingCollected = false;
for (Entry<TopicPartition, RecordCollector> entry : collectors.entrySet()) {
TopicPartition topicPartition = entry.getKey();
RecordCollector collector = entry.getValue();
if (collector.isLimitReached()) {
continue;
}
for (ConsumerRecord<K, V> record : records.records(topicPartition)) {
if (collector.isLimitReached()) {
break;
}
collector.add(record);
anythingCollected = true;
}
}
return anythingCollected;
}