public void shouldSendMessageToKafka()

in ddm-bpm-extension/src/it/java/com/epam/digital/data/platform/bpms/extension/it/AsyncDataLoadDelegateIT.java [60:89]


  public void shouldSendMessageToKafka() throws JsonProcessingException {
    var response = SignResponseDto.builder().signature("test").build();
    var expectedPayload = "{\"file\":{\"checksum\":\"test-checksum\",\"id\":\"file-id\"},\"derivedFile\":{\"checksum\":\"derived-checksum\",\"id\":\"derived-file-id\"}}";

    digitalSignatureMockServer.addStubMapping(
        stubFor(post(urlPathEqualTo("/api/eseal/sign"))
            .withHeader("Content-Type", equalTo("application/json"))
            .withHeader("X-Access-Token", equalTo("token"))
            .withRequestBody(equalTo(
                "{\"data\":\"{\\\"file\\\":{\\\"checksum\\\":\\\"test-checksum\\\",\\\"id\\\":\\\"file-id\\\"},\\\"derivedFile\\\":{\\\"checksum\\\":\\\"derived-checksum\\\",\\\"id\\\":\\\"derived-file-id\\\"}}\"}"))
            .willReturn(
                aResponse().withHeader("Content-Type", "application/json").withStatus(200)
                    .withBody(objectMapper.writeValueAsString(response)))));

    var processInstance = runtimeService
        .startProcessInstanceByKey("testAsyncDataLoadDelegate_key");

    BpmnAwareTests.assertThat(processInstance).isEnded();

    await().atMost(10, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS).untilAsserted(() -> {
      var storage = listener.getStorage();
      assertThat(storage).hasSize(1);
      var message = storage.get(processInstance.getProcessInstanceId());
      assertThat(message.getPayload()).isEqualTo(expectedPayload);

      var headers = message.getHeaders();
      assertThat(headers).containsEntry(AsyncDataLoadDelegate.ENTITY_NAME_HEADER, "test");
      assertThat(headers).containsEntry(AsyncDataLoadDelegate.RESULT_VARIABLE_HEADER, "resultVariable");
    });
  }