void shouldReceiveKafkaMessage()

in ddm-bpm-extension/src/it/java/com/epam/digital/data/platform/bpms/extension/it/listener/AsyncDataLoadResponseKafkaListenerIT.java [56:85]


  void shouldReceiveKafkaMessage(String status, String details) throws JsonProcessingException {
    var resultVariableName = "response";
    var entityName = "item";
    var processInstance = runtimeService
        .startProcessInstanceByKey("testAsyncDataLoadListener_key");
    var payload = AsyncDataLoadResponse.builder()
        .payload(AsyncDataLoadResult.builder()
            .resultVariable(resultVariableName)
            .entityName(entityName)
            .build())
        .status(status)
        .details(details)
        .requestContext(RequestContext.builder()
            .businessProcessInstanceId(processInstance.getProcessInstanceId())
            .build())
        .build();
    var expectedResult = new Result(status, details);
    Message<String> message = MessageBuilder
        .withPayload(mapper.writeValueAsString(payload))
        .copyHeaders(Map.of(TOPIC, kafkaProperties.getTopics().get("data-load-csv-topic-outbound")))
        .build();

    kafkaTemplate.send(message);

    await().atMost(10, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS).untilAsserted(() -> {
      BpmnAwareTests.assertThat(processInstance).variables().containsKey(resultVariableName);
      BpmnAwareTests.assertThat(processInstance).variables().containsValue(expectedResult);
      BpmnAwareTests.assertThat(processInstance).isEnded();
    });
  }