in lagerta-core/src/main/java/com/epam/lagerta/subscriber/lead/ReadTransactions.java [154:184]
private void mergeCollections(Heartbeats heartbeats, Set<UUID> lostReaders, Set<Long> inProgress) {
List<ReaderTxScope> mergedBuffer = MergeUtil.mergeBuffer(buffer, SCOPE_COMPARATOR);
if (!duplicatesPruningScheduled && lostReaders.isEmpty()) {
MergeUtil.merge(scopes, mergedBuffer, SCOPE_COMPARATOR);
} else {
Set<UUID> diedReaders = mergeWithDeduplication(scopes, mergedBuffer, SCOPE_COMPARATOR, lostReaders);
boolean someoneDied = !diedReaders.isEmpty();
if (someoneDied) {
diedReaders
.stream()
.peek(lostReaders::remove)
.forEach(heartbeats::removeDead);
scopes
.stream()
.filter(scope -> diedReaders.contains(scope.getReaderId()))
.forEach(ReaderTxScope::markOrphan);
}
if (someoneDied || duplicatesPruningScheduled) {
Consumer<ReaderTxScope> onRemove = tx -> {
if (tx.isInProgress() && tx.isOrphan()) {
inProgress.remove(tx.getTransactionId());
}
};
deduplicate(lostReaders, scopes, onRemove);
duplicatesPruningScheduled = false;
}
}
buffer = new ArrayList<>(INITIAL_CAPACITY);
}