in src/it/java/com/epam/digital/data/platform/kafkaapi/core/listener/AsyncDataLoadKafkaListenerIT.java [110:142]
void shouldSuccessfullyProcessKafkaMessage() {
InputStream resourceAsStream1 = AsyncDataLoadKafkaListenerIT.class.getResourceAsStream(
"/csv/mockEntity.csv");
InputStream resourceAsStream2 = AsyncDataLoadKafkaListenerIT.class.getResourceAsStream(
"/csv/mockEntity.csv");
FileDataDto fileDataDto = FileDataDto.builder()
.content(resourceAsStream1)
.build();
when(lowcodeFileDataStorageService.loadByKey(anyString())).thenReturn(fileDataDto);
when(datafactoryFileDataStorageService.save(anyString(), any())).thenReturn(
mock(FileMetadataDto.class));
var payload = getAsyncDataLoadRequest(
DigestUtils.sha256Hex(IOUtils.toByteArray(Objects.requireNonNull(resourceAsStream2))));
var message = MessageBuilder
.withPayload(payload)
.copyHeaders(buildHeaders())
.build();
var result = listener.asyncDataLoad(message);
var expectedResultPayload = getResponseMessage(Status.SUCCESS, "OK").getPayload();
var resultPayload = objectMapper.readValue(result.getPayload(), AsyncDataLoadResponse.class);
assertEquals(expectedResultPayload.getPayload(), resultPayload.getPayload());
assertEquals(expectedResultPayload.getDetails(), resultPayload.getDetails());
assertEquals(expectedResultPayload.getStatus(), resultPayload.getStatus());
verify(handler, times(2)).upsert(any());
verify(transactionManager, never()).rollback(any());
verify(lowcodeFileDataStorageService).loadByKey(anyString());
verify(datafactoryFileDataStorageService).save(anyString(), any());
}