in lagerta-core/src/main/java/com/epam/lagerta/subscriber/util/MergeUtil.java [59:81]
public static <T> List<T> mergeBuffer(
List<List<T>> buffer,
Comparator<T> comparator) {
if (buffer.isEmpty()) {
return Collections.emptyList();
}
int size = buffer.size();
double deep = Math.ceil((Math.log(size) / LOG2));
int prevStep = 1;
for (int i = 0; i < deep; i++) {
int step = prevStep * 2;
List<ForkJoinTask> tasks = new ArrayList<>();
for (int j = prevStep; j < size; j += step) {
final int current = j - prevStep;
final int next = j;
Runnable task = () -> merge(buffer.get(current), buffer.get(next), comparator);
tasks.add(pool.submit(task));
}
tasks.forEach(ForkJoinTask::join);
prevStep = step;
}
return buffer.get(0);
}