in src/main/java/com/epam/digital/data/platform/starter/kafka/config/KafkaRequestReplyConfig.java [45:62]
public <I, O> ReplyingKafkaTemplate<String, I, O> replyingKafkaTemplate(
ProducerFactory<String, I> producerFactory,
ConcurrentKafkaListenerContainerFactory<String, O> concurrentKafkaListenerContainerFactory) {
String[] outboundTopics =
kafkaProperties.getRequestReply().getTopics().values().stream()
.map(KafkaProperties.RequestReplyHandler::getReply)
.toArray(String[]::new);
ConcurrentMessageListenerContainer<String, O> replyContainer =
concurrentKafkaListenerContainerFactory.createContainer(outboundTopics);
replyContainer.getContainerProperties().setMissingTopicsFatal(false);
replyContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
ReplyingKafkaTemplate<String, I, O> kafkaTemplate =
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
kafkaTemplate.setSharedReplyTopic(true);
kafkaTemplate.setDefaultReplyTimeout(
Duration.ofSeconds(kafkaProperties.getRequestReply().getTimeoutInSeconds()));
return kafkaTemplate;
}