public boolean collectLastLimitRecords()

in src/main/java/com/epam/eco/commons/kafka/helpers/BiDirectionalTopicRecordFetcher.java [272:295]


    public boolean collectLastLimitRecords(ConsumerRecords<K, V> records,
                                            Map<TopicPartition, BiDirectionalRecordCollector> collectors,
                                            Map<TopicPartition, OffsetRange> currentChunkOffsets) {
        boolean anythingCollected = false;
        for(Map.Entry<TopicPartition, BiDirectionalRecordCollector> entry : collectors.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            if(currentChunkOffsets.get(topicPartition).getSize() > 0) {
                BiDirectionalRecordCollector biDirectionalCollector = entry.getValue();
                for(ConsumerRecord<K, V> record : reversedMessages(records.records(topicPartition))) {
                    if(record.offset() > currentChunkOffsets.get(topicPartition).getLargest() ||
                            record.offset() < currentChunkOffsets.get(topicPartition).getSmallest()) {
                        continue;
                    }
                    if(biDirectionalCollector.isLimitReached()) {
                        break;
                    }
                    biDirectionalCollector.add(record);
                    anythingCollected = true;
                }
            }

        }
        return anythingCollected;
    }