public Message asyncDataLoad()

in src/main/java/com/epam/digital/data/platform/kafkaapi/core/listener/AsyncDataLoadKafkaListener.java [107:176]


  public Message<String> asyncDataLoad(Message<String> requestMessage) {
    var schemaName = getSchemaName(requestMessage);
    var csvProcessor = csvProcessorMap.get(schemaName + "AsyncDataLoadCsvProcessor");
    var commandHandler = commandHandlerMap.get(schemaName + "UpsertCommandHandler");

    TransactionDefinition def = new DefaultTransactionDefinition();
    TransactionStatus transactionStatus = transactionManager.getTransaction(def);

    MessageHeaders requestMessageHeaders = requestMessage.getHeaders();
    RequestContext requestContext = buildRequestContext(requestMessageHeaders);
    SecurityContext securityContext = buildSecurityContext(requestMessageHeaders);

    AsyncDataLoadResponse loadResponse = new AsyncDataLoadResponse();
    loadResponse.setRequestContext(requestContext);

    List<Object> list;
    Status status = null;
    String details = "";
    try {
      AsyncDataLoadRequest payload = objectMapper.readValue(requestMessage.getPayload(),
          AsyncDataLoadRequest.class);
      FileDataDto fileDataDto = getFileDataDto(payload,
          requestContext.getBusinessProcessInstanceId());
      list = csvProcessor.transformFileToEntities(fileDataDto);

      for (int i = 0; i < list.size(); i++) {
        var request = new Request<>(list.get(i), requestContext, securityContext);
        Response<EntityId> response = (Response<EntityId>) upsert(commandHandler,
            request).getPayload();

        if (!response.getStatus().equals(Status.SUCCESS)) {
          var message = String.format("error: %s in line: %d", response.getDetails(), (i + 2));
          throw new ConstraintViolationException(message, details);
        }

        details = "OK";
        status = Status.SUCCESS;
      }

      transactionManager.commit(transactionStatus);
    } catch (ConstraintViolationException e) {
      status = e.getKafkaResponseStatus();
      details = e.getMessage();
      transactionManager.rollback(transactionStatus);
    } catch (Exception e) {
      details = e.getMessage();
      status = Status.OPERATION_FAILED;
      transactionManager.rollback(transactionStatus);
    }

    loadResponse.setStatus(status);
    loadResponse.setDetails(details);
    String resultVariable = requestMessageHeaders.get(RESULT_VARIABLE, String.class);
    String entityName = requestMessageHeaders.get(ENTITY_NAME, String.class);
    AsyncDataLoadResult asyncDataLoadResult = buildAsyncDataLoadResult(resultVariable, entityName);
    loadResponse.setPayload(asyncDataLoadResult);
    String convertedPayload;
    try {
      convertedPayload = objectMapper.writeValueAsString(loadResponse);
    } catch (JsonProcessingException e) {
      throw new IllegalStateException("Unexpected serialization error", e);
    }
    MessageHeaders messageHeaders = getMessageHeaders(requestMessageHeaders);
    Message<String> responseMessage = MessageBuilder.withPayload(convertedPayload)
        .copyHeaders(messageHeaders).build();

    kafkaTemplate.send(responseMessage);

    return responseMessage;
  }