in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [272:295]
public boolean collectLastLimitRecords(ConsumerRecords<K, V> records,
Map<TopicPartition, BiDirectionalRecordCollector> collectors,
Map<TopicPartition, OffsetRange> currentChunkOffsets) {
boolean anythingCollected = false;
for(Map.Entry<TopicPartition, BiDirectionalRecordCollector> entry : collectors.entrySet()) {
TopicPartition topicPartition = entry.getKey();
if(currentChunkOffsets.get(topicPartition).getSize() > 0) {
BiDirectionalRecordCollector biDirectionalCollector = entry.getValue();
for(ConsumerRecord<K, V> record : reversedMessages(records.records(topicPartition))) {
if(record.offset() > currentChunkOffsets.get(topicPartition).getLargest() ||
record.offset() < currentChunkOffsets.get(topicPartition).getSmallest()) {
continue;
}
if(biDirectionalCollector.isLimitReached()) {
break;
}
biDirectionalCollector.add(record);
anythingCollected = true;
}
}
}
return anythingCollected;
}