public ReplyingKafkaTemplate replyingKafkaTemplate()

in src/main/java/com/epam/digital/data/platform/starter/kafka/config/KafkaRequestReplyConfig.java [45:62]


  public <I, O> ReplyingKafkaTemplate<String, I, O> replyingKafkaTemplate(
      ProducerFactory<String, I> producerFactory,
      ConcurrentKafkaListenerContainerFactory<String, O> concurrentKafkaListenerContainerFactory) {
    String[] outboundTopics =
        kafkaProperties.getRequestReply().getTopics().values().stream()
            .map(KafkaProperties.RequestReplyHandler::getReply)
            .toArray(String[]::new);
    ConcurrentMessageListenerContainer<String, O> replyContainer =
        concurrentKafkaListenerContainerFactory.createContainer(outboundTopics);
    replyContainer.getContainerProperties().setMissingTopicsFatal(false);
    replyContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
    ReplyingKafkaTemplate<String, I, O> kafkaTemplate =
        new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
    kafkaTemplate.setSharedReplyTopic(true);
    kafkaTemplate.setDefaultReplyTimeout(
        Duration.ofSeconds(kafkaProperties.getRequestReply().getTimeoutInSeconds()));
    return kafkaTemplate;
  }