public CommittedTransactions loadCommitsAfter()

in lagerta-core/src/main/java/com/epam/lagerta/subscriber/lead/LeadStateLoader.java [55:92]


    public CommittedTransactions loadCommitsAfter(long commitId) {
        Stream<TopicPartition> partitionStream;
        try (Consumer<?, ?> consumer = createConsumer()) {
            shiftToLastCommitted(consumer, commitId);
            partitionStream = getTopicPartitionStream(consumer);
        }
        List<ConsumerKeeper> consumerKeepers = Collections.emptyList();
        try {
            consumerKeepers = partitionStream
                    .map(tp -> new ConsumerKeeper(createAndSubscribeConsumer()))
                    .peek(consumerKeeper -> consumerKeeper.consumer().poll(0))
                    .collect(Collectors.toList());
            List<ConsumerKeeper> finalConsumerKeepers = consumerKeepers;
            CommittedTransactions committed = new CommittedTransactions(commitId);
            ForkJoinPool pool = new ForkJoinPool(consumerKeepers.size());
            pool.submit(() -> {
                while (true) {
                    List<List<List<Long>>> collect = finalConsumerKeepers
                            .parallelStream()
                            .filter(ConsumerKeeper::isAlive)
                            .map(this::consumePartitionUntilOffset)
                            .collect(Collectors.toList());
                    collect.stream().flatMap(Collection::stream).forEach(committed::addAll);
                    committed.compress();
                    if (collect.isEmpty()) {
                        break;
                    }
                }
                return committed;
            }).join();
            return committed;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            consumerKeepers.forEach(ConsumerKeeper::close);
            //todo #236
        }
    }