in src/main/java/com/epam/digital/data/platform/restapi/core/service/GenericService.java [120:138]
private ConsumerRecord<String, String> sendRequest(
Request<I> input, ProducerRecord<String, Request<I>> request) {
var header = new RecordHeader(KafkaHeaders.REPLY_TOPIC, topics.getReply().getBytes());
request.headers().add(header);
log.info("Sending to Kafka, topic {}", request.topic());
var replyFuture = replyingKafkaTemplate.sendAndReceive(request);
try {
var response = replyFuture.get(30L, TimeUnit.SECONDS);
log.info(
"Successfully got response from Kafka, topic: {}, key: {}",
response.topic(),
response.key());
return response;
} catch (Exception e) {
throw new NoKafkaResponseException("No response for request: " + input, e);
}
}