public void seekToTransaction()

in core/src/main/java/org/apache/ignite/activestore/impl/publisher/PublisherKafkaService.java [41:64]


    public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
        String groupId) {
        String topic = config.getLocalTopic();
        Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);

        try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
            Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());

            for (PartitionInfo partitionInfo : partitionInfos) {
                seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
            }
            consumer.assign(seekMap.keySet());
            Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
                if (entry.getValue() != null) {
                    offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
                }
            }
            consumer.commitSync(offsetsToCommit);
        }
    }