public ReplyingKafkaTemplate replyingKafkaTemplate()

in src/main/java/com/epam/digital/data/platform/starter/restapi/config/KafkaConfig.java [119:131]


  public <I, O> ReplyingKafkaTemplate<String, I, O> replyingKafkaTemplate(
      ProducerFactory<String, I> pf, ConcurrentKafkaListenerContainerFactory<String, O> factory) {
    String[] outboundTopics = kafkaProperties.getTopics().values().stream()
            .map(KafkaProperties.Handler::getReplay)
            .toArray(String[]::new);
    ConcurrentMessageListenerContainer<String, O> replyContainer = factory.createContainer(outboundTopics);
    replyContainer.getContainerProperties().setMissingTopicsFatal(false);
    replyContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
    ReplyingKafkaTemplate<String, I, O> kafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);
    kafkaTemplate.setSharedReplyTopic(true);
    kafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(30L));
    return kafkaTemplate;
  }