void shouldSuccessfullyProcessKafkaMessage()

in src/it/java/com/epam/digital/data/platform/kafkaapi/core/listener/AsyncDataLoadKafkaListenerIT.java [110:142]


  void shouldSuccessfullyProcessKafkaMessage() {
    InputStream resourceAsStream1 = AsyncDataLoadKafkaListenerIT.class.getResourceAsStream(
        "/csv/mockEntity.csv");
    InputStream resourceAsStream2 = AsyncDataLoadKafkaListenerIT.class.getResourceAsStream(
        "/csv/mockEntity.csv");
    FileDataDto fileDataDto = FileDataDto.builder()
        .content(resourceAsStream1)
        .build();
    when(lowcodeFileDataStorageService.loadByKey(anyString())).thenReturn(fileDataDto);
    when(datafactoryFileDataStorageService.save(anyString(), any())).thenReturn(
        mock(FileMetadataDto.class));

    var payload = getAsyncDataLoadRequest(
        DigestUtils.sha256Hex(IOUtils.toByteArray(Objects.requireNonNull(resourceAsStream2))));
    var message = MessageBuilder
        .withPayload(payload)
        .copyHeaders(buildHeaders())
        .build();

    var result = listener.asyncDataLoad(message);

    var expectedResultPayload = getResponseMessage(Status.SUCCESS, "OK").getPayload();
    var resultPayload = objectMapper.readValue(result.getPayload(), AsyncDataLoadResponse.class);

    assertEquals(expectedResultPayload.getPayload(), resultPayload.getPayload());
    assertEquals(expectedResultPayload.getDetails(), resultPayload.getDetails());
    assertEquals(expectedResultPayload.getStatus(), resultPayload.getStatus());

    verify(handler, times(2)).upsert(any());
    verify(transactionManager, never()).rollback(any());
    verify(lowcodeFileDataStorageService).loadByKey(anyString());
    verify(datafactoryFileDataStorageService).save(anyString(), any());
  }