in ddm-bpm-extension/src/it/java/com/epam/digital/data/platform/bpms/extension/it/listener/AsyncDataLoadResponseKafkaListenerIT.java [56:85]
void shouldReceiveKafkaMessage(String status, String details) throws JsonProcessingException {
var resultVariableName = "response";
var entityName = "item";
var processInstance = runtimeService
.startProcessInstanceByKey("testAsyncDataLoadListener_key");
var payload = AsyncDataLoadResponse.builder()
.payload(AsyncDataLoadResult.builder()
.resultVariable(resultVariableName)
.entityName(entityName)
.build())
.status(status)
.details(details)
.requestContext(RequestContext.builder()
.businessProcessInstanceId(processInstance.getProcessInstanceId())
.build())
.build();
var expectedResult = new Result(status, details);
Message<String> message = MessageBuilder
.withPayload(mapper.writeValueAsString(payload))
.copyHeaders(Map.of(TOPIC, kafkaProperties.getTopics().get("data-load-csv-topic-outbound")))
.build();
kafkaTemplate.send(message);
await().atMost(10, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS).untilAsserted(() -> {
BpmnAwareTests.assertThat(processInstance).variables().containsKey(resultVariableName);
BpmnAwareTests.assertThat(processInstance).variables().containsValue(expectedResult);
BpmnAwareTests.assertThat(processInstance).isEnded();
});
}