public void write1()

in clns-acuity-admin/acuity-etl/src/main/java/com/acuity/visualisations/batch/writer/InputModelChunkWriterImpl.java [116:243]


    public void write1(List<? extends OutputModelChunk> outputModelChunks) throws Exception {
        executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int1");
        List<OutputEntity> entitiesList = new ArrayList<>();
        for (OutputModelChunk chunk : outputModelChunks) {
            entitiesList.addAll(chunk.getEntities());
        }
        Map<Class<?>, Map<String, List<OutputEntity>>> groupedEntities = WriterUtil.groupByUniqueHash(entitiesList);
        Map<Class<?>, Map<String, OutputEntity>> mergeEntities = mergeEntities(groupedEntities);
        List<OutputEntity> toInsert = new ArrayList<>();
        Map<Class<?>, Map<String, OutputEntity>> toInsertMerged = new LinkedHashMap<>();
        List<OutputEntity> toUpdate = new ArrayList<>();
        List<OutputEntity> toUpdateState = new ArrayList<>();
        List<OutputEntity> allEntities = new ArrayList<>();
        executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int1");
        executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int2");
        for (Map.Entry<Class<?>, Map<String, OutputEntity>> entry : mergeEntities.entrySet()) {
            try {
                Class<?> entityClass = entry.getKey();
                Map<String, OutputEntity> entitiesByHash = entry.getValue();

                executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-getHashValues");
                Map<OctetString, RowParameters> hashesByEntity = hashValuesHolder.getHashValues(entityClass);
                executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-getHashValues");

                for (Map.Entry<String, OutputEntity> entry2 : entitiesByHash.entrySet()) {
                    try {
                        OctetString hash = new OctetString(entry2.getKey());
                        OutputEntity entity = entry2.getValue();
                        if (hashesByEntity.containsKey(hash)) {
                            RowParameters rowParameters = hashesByEntity.get(hash);
                            int secondaryHash = rowParameters.getSecondaryHash();
                            int entitySecondaryHash = entity.getIntHashForSecondaryFields();

                            if (secondaryHash == entitySecondaryHash) {
                                entity.setId(rowParameters.getId());
                                if (rowParameters.getState() != null) {
                                    switch (rowParameters.getState()) {
                                        case OLD:
                                        case JUST_UPDATED:
                                        case SYNCHRONIZED:
                                        case JUST_INSERTED:
                                        case TMP_JUST_UPDATED:
                                            entity.setState(State.TMP_SYNCHRONIZED);
                                            toUpdateState.add(entity);
                                            break;
                                        case TMP_JUST_INSERTED:
                                        case TMP_SYNCHRONIZED:
                                            break;
                                        default:
                                            break;
                                    }
                                }
                            } else {
                                entity.setId(rowParameters.getId());
                                if (rowParameters.getState() != null) {
                                    switch (rowParameters.getState()) {
                                        case TMP_JUST_INSERTED:
                                            entity.setState(State.TMP_JUST_INSERTED);
                                            break;
                                        default:
                                            entity.setState(State.TMP_JUST_UPDATED);
                                            break;
                                    }
                                }
                                toUpdate.add(entity);
                                getDataCommonReport().getFileReport(entity.getSourceName()).incRowsUploaded();
                            }
                        } else {
                            entity.setId(UUID.randomUUID().toString().replaceAll("-", ""));
                            entity.setState(State.TMP_JUST_INSERTED);
                            toInsert.add(entity);
                            toInsertMerged.computeIfAbsent(entityClass, c -> new HashMap<>())
                                    .put(entity.getSha1ForUniqueFields(), entity);
                            getDataCommonReport().getFileReport(entity.getSourceName()).incRowsUploaded();
                        }
                        allEntities.add(entity);

                    } catch (Exception e) {
                        debug(String.format("Error caught preparing writing data for item %s : %s (%s)",
                                entry2.getValue().allFieldsToString(), e.getMessage(), e.getClass().getName()));
                        throw e;
                    }
                }
            } catch (Exception e) {
                debug(String.format("Error caught preparing writing data for entity %s : %s (%s)",
                        entry.getKey().getName(), e.getMessage(), e.getClass().getName()));
                throw e;
            }
        }
        executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int2");
        executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys");
        executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-1");

        List<OutputEntity> fkFailedEntities = foreignKeyService.setForeignKeys(toInsert, toInsertMerged);
        executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-1");
        executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-2");
        Map<Class<?>, Map<String, List<OutputEntity>>> groupFailedByUniqueHash = WriterUtil.groupByUniqueHash(fkFailedEntities);
        Iterator<OutputEntity> toInsertIterator = toInsert.iterator();
        while (toInsertIterator.hasNext()) {
            OutputEntity outputEntity = toInsertIterator.next();
            Class<? extends OutputEntity> entityClass = outputEntity.getClass();
            if (groupFailedByUniqueHash.containsKey(entityClass)) {
                String hash = outputEntity.getSha1ForUniqueFields();
                if (groupFailedByUniqueHash.get(entityClass).containsKey(hash)) {
                    toInsertIterator.remove();
                }
            }
        }
        refreshHashValueHolder(toInsert);
        refreshHashValueHolder(toUpdate);
        refreshHashValueHolder(toUpdateState);

        //RCT-3687
        List<OutputEntity> failedFksToUpdate = foreignKeyService.validateFKsInHashes(toUpdate);
        toUpdate.removeAll(failedFksToUpdate);
        List<OutputEntity> failedFksToUpdateState = foreignKeyService.validateFKsInHashes(toUpdateState);
        toUpdateState.removeAll(failedFksToUpdateState);


        executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-2");
        executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys");

        debug("Inserting: {} , updating: {}, updating state: {}", toInsert.size(), toUpdate.size(), toUpdateState.size());
        bdService.addEntitiesToInsert(toInsert);
        bdService.addEntitiesToUpdate(toUpdate);
        bdService.persist();
        debug("Inserting completed: {} , updating: {}, updating state: {}", toInsert.size(), toUpdate.size(), toUpdateState.size());
    }