in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/SubscriberConsumer.java [140:178]
private void pollTransaction() {
ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);
List<TransactionMetadata> metadatas;
if (metadataResendRequested) {
metadataResendRequested = false;
metadatas = new ArrayList<>(buffer.size() + records.count());
for (TransactionWrapper wrapper : buffer.getUncommittedTxs()) {
metadatas.add(wrapper.deserializedMetadata());
}
LOGGER.info("[C] Resending metadata {} from {}", metadatas.size(), f(consumerId));
}
else {
metadatas = new ArrayList<>(records.count());
}
for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
TransactionMetadata metadata = serializer.deserialize(record.key());
TransactionWrapper wrapper = new TransactionWrapper(record, metadata);
metadatas.add(metadata);
buffer.add(wrapper);
}
Collections.sort(metadatas, COMPARATOR);
if (!metadatas.isEmpty()) {
LOGGER.debug("[C] Before send {} from {}", metadatas, f(consumerId));
}
LeadResponse response = lead.notifyTransactionsRead(consumerId, metadatas);
if (response != null) {
if (response.getAlreadyProcessedIds() != null) {
buffer.markAlreadyCommitted(response.getAlreadyProcessedIds());
LOGGER.debug("[C] Remove {} in {}", response.getAlreadyProcessedIds(), f(consumerId));
}
if (response.getToCommitIds() != null) {
LOGGER.debug("[C] Gotcha {} in {}", response.getToCommitIds(), f(consumerId));
buffer.markInProgress(response.getToCommitIds());
committer.commitAsync(response.getToCommitIds(), deserializerClosure, doneNotifier, fullCommitHandler);
}
}
}