in src/main/java/com/epam/digital/data/platform/kafkaapi/core/listener/AsyncDataLoadKafkaListener.java [107:176]
public Message<String> asyncDataLoad(Message<String> requestMessage) {
var schemaName = getSchemaName(requestMessage);
var csvProcessor = csvProcessorMap.get(schemaName + "AsyncDataLoadCsvProcessor");
var commandHandler = commandHandlerMap.get(schemaName + "UpsertCommandHandler");
TransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(def);
MessageHeaders requestMessageHeaders = requestMessage.getHeaders();
RequestContext requestContext = buildRequestContext(requestMessageHeaders);
SecurityContext securityContext = buildSecurityContext(requestMessageHeaders);
AsyncDataLoadResponse loadResponse = new AsyncDataLoadResponse();
loadResponse.setRequestContext(requestContext);
List<Object> list;
Status status = null;
String details = "";
try {
AsyncDataLoadRequest payload = objectMapper.readValue(requestMessage.getPayload(),
AsyncDataLoadRequest.class);
FileDataDto fileDataDto = getFileDataDto(payload,
requestContext.getBusinessProcessInstanceId());
list = csvProcessor.transformFileToEntities(fileDataDto);
for (int i = 0; i < list.size(); i++) {
var request = new Request<>(list.get(i), requestContext, securityContext);
Response<EntityId> response = (Response<EntityId>) upsert(commandHandler,
request).getPayload();
if (!response.getStatus().equals(Status.SUCCESS)) {
var message = String.format("error: %s in line: %d", response.getDetails(), (i + 2));
throw new ConstraintViolationException(message, details);
}
details = "OK";
status = Status.SUCCESS;
}
transactionManager.commit(transactionStatus);
} catch (ConstraintViolationException e) {
status = e.getKafkaResponseStatus();
details = e.getMessage();
transactionManager.rollback(transactionStatus);
} catch (Exception e) {
details = e.getMessage();
status = Status.OPERATION_FAILED;
transactionManager.rollback(transactionStatus);
}
loadResponse.setStatus(status);
loadResponse.setDetails(details);
String resultVariable = requestMessageHeaders.get(RESULT_VARIABLE, String.class);
String entityName = requestMessageHeaders.get(ENTITY_NAME, String.class);
AsyncDataLoadResult asyncDataLoadResult = buildAsyncDataLoadResult(resultVariable, entityName);
loadResponse.setPayload(asyncDataLoadResult);
String convertedPayload;
try {
convertedPayload = objectMapper.writeValueAsString(loadResponse);
} catch (JsonProcessingException e) {
throw new IllegalStateException("Unexpected serialization error", e);
}
MessageHeaders messageHeaders = getMessageHeaders(requestMessageHeaders);
Message<String> responseMessage = MessageBuilder.withPayload(convertedPayload)
.copyHeaders(messageHeaders).build();
kafkaTemplate.send(responseMessage);
return responseMessage;
}