public void start()

in core/src/main/java/org/apache/ignite/activestore/impl/publisher/ReconciliationWriter.java [54:80]


    public void start() {
        running = true;

        Runnable onStop = new Runnable() {
            @Override
            public void run() {
                //todo add logic
            }
        };

        try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(localConsumerProperties, onStop);
             Producer<ByteBuffer, ByteBuffer> producer = kafkaFactory.producer(replicaProducerProperties)) {
            int partitions = producer.partitionsFor(reconciliationTopic).size();

            consumer.subscribe(Collections.singletonList(localTopic));
            while (running) {
                ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);

                for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
                    long transactionId = record.timestamp();
                    int partition = TransactionMessageUtil.partitionFor(transactionId, partitions);

                    producer.send(new ProducerRecord<>(reconciliationTopic, partition, record.key(), record.value()));
                }
            }
        }
    }