in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/ConsumerPingManager.java [53:78]
public void checkPing() {
long currentTime = System.currentTimeMillis();
if (!consumerOutOfOrder.isEmpty()) {
Set<UUID> cleaned = planner.getFinallyDeadConsumers(consumerOutOfOrder);
LOGGER.info("[L] Consumers {} went to graveyard", cleaned);
availableWorkBuffer.keySet().removeAll(cleaned);
consumerOutOfOrder.removeAll(cleaned);
for (UUID consumerId : cleaned) {
pingTimes.remove(consumerId);
}
staleConsumers.removeAll(cleaned);
}
pingTimes.forEachKeyValue((consumerId, pingTime) -> {
if (strategy.isOutOfOrder(currentTime, pingTime)) {
if (consumerOutOfOrder.add(consumerId)) {
LOGGER.info("[L] Consumer {} is out of order", f(consumerId));
staleConsumers.add(consumerId);
planner.registerOutOfOrderConsumer(consumerId);
}
}
else if (consumerOutOfOrder.remove(consumerId)) {
planner.reuniteConsumer(consumerId);
}
});
}