in core/src/main/java/org/apache/ignite/activestore/impl/subscriber/consumer/SubscriberConsumer.java [78:105]
public SubscriberConsumer(
LeadService lead,
Serializer serializer,
Committer committer,
TransactionsBuffer buffer,
DoneNotifier doneNotifier,
FullCommitHandler fullCommitHandler,
DeserializerClosure deserializerClosure,
BufferOverflowCondition bufferOverflowCondition,
KafkaFactory kafkaFactory,
DataRecoveryConfig dataRecoveryConfig,
@Named(DataCapturerBusConfiguration.NODE_ID) UUID consumerId,
OnKafkaStop onKafkaStop
) {
this.lead = lead;
this.serializer = serializer;
this.committer = committer;
this.buffer = buffer;
this.doneNotifier = doneNotifier.setBuffer(buffer);
this.fullCommitHandler = fullCommitHandler.setBuffer(buffer);
this.deserializerClosure = deserializerClosure.setBuffer(buffer);
this.bufferOverflowCondition = bufferOverflowCondition;
consumer = kafkaFactory.consumer(dataRecoveryConfig.getConsumerConfig(), onKafkaStop);
this.consumerId = consumerId;
remoteTopic = dataRecoveryConfig.getRemoteTopic();
reconciliationTopic = dataRecoveryConfig.getReconciliationTopic();
}