in lagerta-core/src/main/java/com/epam/lagerta/subscriber/lead/LeadStateLoader.java [55:92]
public CommittedTransactions loadCommitsAfter(long commitId) {
Stream<TopicPartition> partitionStream;
try (Consumer<?, ?> consumer = createConsumer()) {
shiftToLastCommitted(consumer, commitId);
partitionStream = getTopicPartitionStream(consumer);
}
List<ConsumerKeeper> consumerKeepers = Collections.emptyList();
try {
consumerKeepers = partitionStream
.map(tp -> new ConsumerKeeper(createAndSubscribeConsumer()))
.peek(consumerKeeper -> consumerKeeper.consumer().poll(0))
.collect(Collectors.toList());
List<ConsumerKeeper> finalConsumerKeepers = consumerKeepers;
CommittedTransactions committed = new CommittedTransactions(commitId);
ForkJoinPool pool = new ForkJoinPool(consumerKeepers.size());
pool.submit(() -> {
while (true) {
List<List<List<Long>>> collect = finalConsumerKeepers
.parallelStream()
.filter(ConsumerKeeper::isAlive)
.map(this::consumePartitionUntilOffset)
.collect(Collectors.toList());
collect.stream().flatMap(Collection::stream).forEach(committed::addAll);
committed.compress();
if (collect.isEmpty()) {
break;
}
}
return committed;
}).join();
return committed;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
consumerKeepers.forEach(ConsumerKeeper::close);
//todo #236
}
}