in clns-acuity-admin/acuity-etl/src/main/java/com/acuity/visualisations/batch/writer/InputModelChunkWriterImpl.java [116:243]
public void write1(List<? extends OutputModelChunk> outputModelChunks) throws Exception {
executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int1");
List<OutputEntity> entitiesList = new ArrayList<>();
for (OutputModelChunk chunk : outputModelChunks) {
entitiesList.addAll(chunk.getEntities());
}
Map<Class<?>, Map<String, List<OutputEntity>>> groupedEntities = WriterUtil.groupByUniqueHash(entitiesList);
Map<Class<?>, Map<String, OutputEntity>> mergeEntities = mergeEntities(groupedEntities);
List<OutputEntity> toInsert = new ArrayList<>();
Map<Class<?>, Map<String, OutputEntity>> toInsertMerged = new LinkedHashMap<>();
List<OutputEntity> toUpdate = new ArrayList<>();
List<OutputEntity> toUpdateState = new ArrayList<>();
List<OutputEntity> allEntities = new ArrayList<>();
executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int1");
executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int2");
for (Map.Entry<Class<?>, Map<String, OutputEntity>> entry : mergeEntities.entrySet()) {
try {
Class<?> entityClass = entry.getKey();
Map<String, OutputEntity> entitiesByHash = entry.getValue();
executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-getHashValues");
Map<OctetString, RowParameters> hashesByEntity = hashValuesHolder.getHashValues(entityClass);
executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-getHashValues");
for (Map.Entry<String, OutputEntity> entry2 : entitiesByHash.entrySet()) {
try {
OctetString hash = new OctetString(entry2.getKey());
OutputEntity entity = entry2.getValue();
if (hashesByEntity.containsKey(hash)) {
RowParameters rowParameters = hashesByEntity.get(hash);
int secondaryHash = rowParameters.getSecondaryHash();
int entitySecondaryHash = entity.getIntHashForSecondaryFields();
if (secondaryHash == entitySecondaryHash) {
entity.setId(rowParameters.getId());
if (rowParameters.getState() != null) {
switch (rowParameters.getState()) {
case OLD:
case JUST_UPDATED:
case SYNCHRONIZED:
case JUST_INSERTED:
case TMP_JUST_UPDATED:
entity.setState(State.TMP_SYNCHRONIZED);
toUpdateState.add(entity);
break;
case TMP_JUST_INSERTED:
case TMP_SYNCHRONIZED:
break;
default:
break;
}
}
} else {
entity.setId(rowParameters.getId());
if (rowParameters.getState() != null) {
switch (rowParameters.getState()) {
case TMP_JUST_INSERTED:
entity.setState(State.TMP_JUST_INSERTED);
break;
default:
entity.setState(State.TMP_JUST_UPDATED);
break;
}
}
toUpdate.add(entity);
getDataCommonReport().getFileReport(entity.getSourceName()).incRowsUploaded();
}
} else {
entity.setId(UUID.randomUUID().toString().replaceAll("-", ""));
entity.setState(State.TMP_JUST_INSERTED);
toInsert.add(entity);
toInsertMerged.computeIfAbsent(entityClass, c -> new HashMap<>())
.put(entity.getSha1ForUniqueFields(), entity);
getDataCommonReport().getFileReport(entity.getSourceName()).incRowsUploaded();
}
allEntities.add(entity);
} catch (Exception e) {
debug(String.format("Error caught preparing writing data for item %s : %s (%s)",
entry2.getValue().allFieldsToString(), e.getMessage(), e.getClass().getName()));
throw e;
}
}
} catch (Exception e) {
debug(String.format("Error caught preparing writing data for entity %s : %s (%s)",
entry.getKey().getName(), e.getMessage(), e.getClass().getName()));
throw e;
}
}
executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-int2");
executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys");
executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-1");
List<OutputEntity> fkFailedEntities = foreignKeyService.setForeignKeys(toInsert, toInsertMerged);
executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-1");
executionProfiler.startOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-2");
Map<Class<?>, Map<String, List<OutputEntity>>> groupFailedByUniqueHash = WriterUtil.groupByUniqueHash(fkFailedEntities);
Iterator<OutputEntity> toInsertIterator = toInsert.iterator();
while (toInsertIterator.hasNext()) {
OutputEntity outputEntity = toInsertIterator.next();
Class<? extends OutputEntity> entityClass = outputEntity.getClass();
if (groupFailedByUniqueHash.containsKey(entityClass)) {
String hash = outputEntity.getSha1ForUniqueFields();
if (groupFailedByUniqueHash.get(entityClass).containsKey(hash)) {
toInsertIterator.remove();
}
}
}
refreshHashValueHolder(toInsert);
refreshHashValueHolder(toUpdate);
refreshHashValueHolder(toUpdateState);
//RCT-3687
List<OutputEntity> failedFksToUpdate = foreignKeyService.validateFKsInHashes(toUpdate);
toUpdate.removeAll(failedFksToUpdate);
List<OutputEntity> failedFksToUpdateState = foreignKeyService.validateFKsInHashes(toUpdateState);
toUpdateState.removeAll(failedFksToUpdateState);
executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys-2");
executionProfiler.stopOperation(getJobExecutionId(), "InputModelChunkWriterImpl.write-setForeignKeys");
debug("Inserting: {} , updating: {}, updating state: {}", toInsert.size(), toUpdate.size(), toUpdateState.size());
bdService.addEntitiesToInsert(toInsert);
bdService.addEntitiesToUpdate(toUpdate);
bdService.persist();
debug("Inserting completed: {} , updating: {}, updating state: {}", toInsert.size(), toUpdate.size(), toUpdateState.size());
}