in core/src/main/java/org/apache/ignite/activestore/impl/publisher/PublisherKafkaService.java [41:64]
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
String groupId) {
String topic = config.getLocalTopic();
Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);
try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
}
consumer.assign(seekMap.keySet());
Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
if (entry.getValue() != null) {
offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
}
}
consumer.commitSync(offsetsToCommit);
}
}