in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/lead/Lead.java [193:213]
public LeadResponse notifyTransactionsRead(UUID consumerId, List<TransactionMetadata> metadatas) {
if (running) {
if (metadatas.size() > 0) {
LOGGER.debug("[L] Got {} from consumer {}", metadatas, f(consumerId));
}
enqueueTask(new NotifyTask(consumerId, pingManager, planner, metadatas));
if (kafkaIsOutOfOrder && !metadatas.isEmpty()) {
enqueueTask(new KafkaResurrectionCheckTask(this, dataRecoveryConfig, kafkaFactory));
}
LeadResponse result = availableWorkBuffer.remove(consumerId);
if (result != null) {
LOGGER.debug("[L] Return {} for {}", result, f(consumerId));
}
return result == null ? LeadResponse.EMPTY : result;
} else {
if (metadatas.size() > 0) {
LOGGER.debug("[L] Got {} from consumer {} while dead", metadatas, f(consumerId));
}
}
return LeadResponse.EMPTY;
}