in core/src/main/java/org/apache/ignite/activestore/impl/publisher/ReconciliationWriter.java [54:80]
public void start() {
running = true;
Runnable onStop = new Runnable() {
@Override
public void run() {
//todo add logic
}
};
try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(localConsumerProperties, onStop);
Producer<ByteBuffer, ByteBuffer> producer = kafkaFactory.producer(replicaProducerProperties)) {
int partitions = producer.partitionsFor(reconciliationTopic).size();
consumer.subscribe(Collections.singletonList(localTopic));
while (running) {
ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);
for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
long transactionId = record.timestamp();
int partition = TransactionMessageUtil.partitionFor(transactionId, partitions);
producer.send(new ProducerRecord<>(reconciliationTopic, partition, record.key(), record.value()));
}
}
}
}