in lagerta-core/src/main/java/com/epam/lagerta/subscriber/ParallelCommitStrategy.java [132:159]
private void execute() {
try {
while (count.getAndDecrement() > 0) {
TransactionRelation relation = tasks.take();
boolean alive = relation.isAlive();
if (alive) {
if (commitServitor.commit(relation.getId(), buffer)) {
relation
.dependent()
.stream()
.filter(TransactionRelation::release)
.forEach(tasks::add);
continue;
}
deadHasRisen = true;
relation.kill();
}
relation
.dependent()
.stream()
.peek(TransactionRelation::kill)
.filter(TransactionRelation::release)
.forEach(tasks::add);
}
} catch (InterruptedException | IgniteInterruptedException e) {
LOGGER.error("[R] Exception while committing with ParallelCommitStrategy", e);
}
}