in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timescale/TimescaleMigrationService.java [49:200]
public void apply(SchemaChangeMessage model, String schemaName) {
long migrationTimestamp = model.getTimeStampMs();
long migrationVersion = model.getVersion();
LOG.info().append("Try to handle migration message for scheme: ").append(schemaName).append(" with timestamp: ")
.append(migrationTimestamp).commit();
ObjectArrayList<ClassDescriptorInfo> newState = model.getNewState();
TimescaleSchema newSchema = schemaDefinition.getTimebaseSchemaDefinition(newState, schemaName);
ObjectArrayList<ClassDescriptorInfo> previousState = model.getPreviousState();
TimescaleSchema previousSchema = schemaDefinition.getTimebaseSchemaDefinition(previousState, schemaName);
ObjectArrayList<SchemaDescriptorChangeActionInfo> changes = model.getDescriptorChangeActions();
changes.forEach(change -> {
ClassDescriptorInfo previousDescriptor = change.getPreviousState();
String previousDescriptorName = previousDescriptor != null ? previousDescriptor.getName().toString() : null;
ClassDescriptorInfo newDescriptor = change.getNewState();
String newDescriptorName = newDescriptor != null ? newDescriptor.getName().toString() : null;
if (change.hasChangeTypes()) {
//ALTER DELETE ADD etc.
SchemaDescriptorChangeType changeType = change.getChangeTypes();
switch (changeType) {
case ADD:
if (newDescriptor instanceof RecordClassDescriptor) {
Set<TimescaleColumn> addColumns = getTimescaleColumns(newSchema, (RecordClassDescriptorInfo) newDescriptor);
handleAddColumnsOperation(schemaName, addColumns);
}
break;
case DELETE:
if (previousDescriptor instanceof RecordClassDescriptor) {
Set<TimescaleColumn> removeColumns = getTimescaleColumns(previousSchema, (RecordClassDescriptor) previousDescriptor)
.stream()
.filter(column -> column.getRelatedDescriptors().size() <= 1)
.collect(Collectors.toSet());
handleDropColumnsOperation(schemaName, removeColumns);
}
break;
case RENAME:
// rename descriptor_name field
if (previousDescriptor instanceof RecordClassDescriptor && newDescriptor instanceof RecordClassDescriptor) {
handleRenameDescriptorOperation(previousSchema, previousDescriptorName,
newDescriptorName);
}
break;
case CONTENT_TYPE_CHANGE:
// remove all records with descriptor_name
handleDropRecordOperation(previousSchema, previousDescriptorName);
break;
case FIELDS_CHANGE:
// handle single column change
ObjectList<SchemaFieldChangeActionInfo> fieldChangeActions = change.getFieldChangeActions();
for (int i = 0; i < fieldChangeActions.size(); i++) {
SchemaFieldChangeActionInfo action = fieldChangeActions.getObject(i);
SchemaFieldChangeType fieldChangeType = action.getChangeTypes();
DataFieldInfo previousColumn = action.getPreviousState();
DataFieldInfo newColumn = action.getNewState();
switch (fieldChangeType) {
case RENAME:
//rename field operation
TimescaleColumn previousColumnForRename = getTimescaleColumn(previousSchema, previousColumn,
previousDescriptorName);
TimescaleColumn newColumnForRename = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
handleRenameColumnOperation(schemaName, previousColumnForRename, newColumnForRename);
break;
case DELETE:
// get previous state and delete this field
TimescaleColumn previousColumnForDelete = getTimescaleColumn(previousSchema, previousColumn,
previousDescriptorName);
handleDropColumnOperation(schemaName, previousColumnForDelete);
break;
case ADD:
// get new state and add it
TimescaleColumn newColumnForAdd = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
handleAddColumnOperation(schemaName, newColumnForAdd);
break;
case DATA_TYPE_CHANGE:
if (action.hasDataTransformation()) {
SchemaFieldDataTransformationInfo dataTransformation = action.getDataTransformation();
if (dataTransformation.hasTransformationType()) {
SchemaFieldDataTransformationType transformationType = dataTransformation.getTransformationType();
switch (transformationType) {
case DROP_RECORD:
handleDropRecordOperation(previousSchema, previousDescriptorName);
break;
case SET_DEFAULT:
// set new value where null
CharSequence defaultValue = dataTransformation.getDefaultValue();
TimescaleColumn newColumnWithDefaultValue = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
handleSetDefaultValueOperation(schemaName, newColumnWithDefaultValue, defaultValue.toString());
break;
case CONVERT_DATA:
TimescaleColumn newColumnWithConvertedType = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
handleChangeDataTypeOperation(schemaName, newColumnWithConvertedType);
break;
default:
LOG.warn().append("Unsupported transformation type: ").append(transformationType).commit();
break;
}
}
}
break;
case STATIC_VALUE_CHANGE:
//rename all default prev default values to new one
CharSequence defaultValue = action.getDataTransformation().getDefaultValue();
TimescaleColumn newStaticColumn = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
handleChangeDefaultValueOperation(schemaName, newStaticColumn, defaultValue.toString());
break;
case MODIFIER_CHANGE:
TimescaleColumn columnWithChangedModifier = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
if (newColumn instanceof StaticDataField) {
String newDefaultVal = action.getDataTransformation().getDefaultValue().toString();
handleChangeDefaultValueOperation(schemaName, columnWithChangedModifier, newDefaultVal);
} else {
if (action.hasDataTransformation()) {
if (action.getDataTransformation().hasDefaultValue()) {
String newDefaultVal = action.getDataTransformation().getDefaultValue().toString();
handleChangeDefaultValueOperation(schemaName, columnWithChangedModifier, newDefaultVal);
}
}
}
break;
case DESCRIPTION_CHANGE:
case PRIMARY_KEY_CHANGE:
case TITLE_CHANGE:
case ORDINAL_CHANGE:
case RELATION_CHANGE:
LOG.debug().append("Ignore field change action: ").append(fieldChangeType).commit();
break;
default:
LOG.warn().append("Unsupported field change action: ").append(fieldChangeType).commit();
break;
}
}
break;
default:
LOG.warn().append("There is no implementation for handling change type: ").append(changeType).commit();
break;
}
}
});
MigrationMetadata migrationMetadata = migrationMetadataService.getByStreamName(schemaName).get();
migrationMetadata.setVersion(migrationVersion);
migrationMetadata.setIsSuccess(Boolean.TRUE);
migrationMetadata.setDateTime(System.currentTimeMillis());
migrationMetadataService.update(migrationMetadata);
}