public void apply()

in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timescale/TimescaleMigrationService.java [49:200]


    public void apply(SchemaChangeMessage model, String schemaName) {
        long migrationTimestamp = model.getTimeStampMs();
        long migrationVersion = model.getVersion();

        LOG.info().append("Try to handle migration message for scheme: ").append(schemaName).append(" with timestamp: ")
                .append(migrationTimestamp).commit();

        ObjectArrayList<ClassDescriptorInfo> newState = model.getNewState();
        TimescaleSchema newSchema = schemaDefinition.getTimebaseSchemaDefinition(newState, schemaName);
        ObjectArrayList<ClassDescriptorInfo> previousState = model.getPreviousState();
        TimescaleSchema previousSchema = schemaDefinition.getTimebaseSchemaDefinition(previousState, schemaName);

        ObjectArrayList<SchemaDescriptorChangeActionInfo> changes = model.getDescriptorChangeActions();
        changes.forEach(change -> {
            ClassDescriptorInfo previousDescriptor = change.getPreviousState();
            String previousDescriptorName = previousDescriptor != null ? previousDescriptor.getName().toString() : null;
            ClassDescriptorInfo newDescriptor = change.getNewState();
            String newDescriptorName = newDescriptor != null ? newDescriptor.getName().toString() : null;

            if (change.hasChangeTypes()) {
                //ALTER DELETE ADD etc.
                SchemaDescriptorChangeType changeType = change.getChangeTypes();

                switch (changeType) {
                    case ADD:
                        if (newDescriptor instanceof RecordClassDescriptor) {
                            Set<TimescaleColumn> addColumns = getTimescaleColumns(newSchema, (RecordClassDescriptorInfo) newDescriptor);
                            handleAddColumnsOperation(schemaName, addColumns);
                        }
                        break;
                    case DELETE:
                        if (previousDescriptor instanceof RecordClassDescriptor) {
                            Set<TimescaleColumn> removeColumns = getTimescaleColumns(previousSchema, (RecordClassDescriptor) previousDescriptor)
                                    .stream()
                                    .filter(column -> column.getRelatedDescriptors().size() <= 1)
                                    .collect(Collectors.toSet());
                            handleDropColumnsOperation(schemaName, removeColumns);
                        }
                        break;
                    case RENAME:
                        // rename descriptor_name field
                        if (previousDescriptor instanceof RecordClassDescriptor && newDescriptor instanceof RecordClassDescriptor) {
                            handleRenameDescriptorOperation(previousSchema, previousDescriptorName,
                                    newDescriptorName);
                        }
                        break;
                    case CONTENT_TYPE_CHANGE:
                        // remove all records with descriptor_name
                        handleDropRecordOperation(previousSchema, previousDescriptorName);
                        break;
                    case FIELDS_CHANGE:
                        // handle single column change
                        ObjectList<SchemaFieldChangeActionInfo> fieldChangeActions = change.getFieldChangeActions();
                        for (int i = 0; i < fieldChangeActions.size(); i++) {
                            SchemaFieldChangeActionInfo action = fieldChangeActions.getObject(i);
                            SchemaFieldChangeType fieldChangeType = action.getChangeTypes();
                            DataFieldInfo previousColumn = action.getPreviousState();
                            DataFieldInfo newColumn = action.getNewState();

                            switch (fieldChangeType) {
                                case RENAME:
                                    //rename field operation
                                    TimescaleColumn previousColumnForRename = getTimescaleColumn(previousSchema, previousColumn,
                                            previousDescriptorName);
                                    TimescaleColumn newColumnForRename = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
                                    handleRenameColumnOperation(schemaName, previousColumnForRename, newColumnForRename);
                                    break;
                                case DELETE:
                                    // get previous state and delete this field
                                    TimescaleColumn previousColumnForDelete = getTimescaleColumn(previousSchema, previousColumn,
                                            previousDescriptorName);
                                    handleDropColumnOperation(schemaName, previousColumnForDelete);
                                    break;
                                case ADD:
                                    // get new state and add it
                                    TimescaleColumn newColumnForAdd = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
                                    handleAddColumnOperation(schemaName, newColumnForAdd);
                                    break;
                                case DATA_TYPE_CHANGE:
                                    if (action.hasDataTransformation()) {
                                        SchemaFieldDataTransformationInfo dataTransformation = action.getDataTransformation();
                                        if (dataTransformation.hasTransformationType()) {
                                            SchemaFieldDataTransformationType transformationType = dataTransformation.getTransformationType();

                                            switch (transformationType) {
                                                case DROP_RECORD:
                                                    handleDropRecordOperation(previousSchema, previousDescriptorName);
                                                    break;
                                                case SET_DEFAULT:
                                                    // set new value where null
                                                    CharSequence defaultValue = dataTransformation.getDefaultValue();
                                                    TimescaleColumn newColumnWithDefaultValue = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
                                                    handleSetDefaultValueOperation(schemaName, newColumnWithDefaultValue, defaultValue.toString());
                                                    break;
                                                case CONVERT_DATA:
                                                    TimescaleColumn newColumnWithConvertedType = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
                                                    handleChangeDataTypeOperation(schemaName, newColumnWithConvertedType);
                                                    break;
                                                default:
                                                    LOG.warn().append("Unsupported transformation type: ").append(transformationType).commit();
                                                    break;
                                            }
                                        }
                                    }
                                    break;
                                case STATIC_VALUE_CHANGE:
                                    //rename all default prev default values to new one
                                    CharSequence defaultValue = action.getDataTransformation().getDefaultValue();
                                    TimescaleColumn newStaticColumn = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
                                    handleChangeDefaultValueOperation(schemaName, newStaticColumn, defaultValue.toString());
                                    break;
                                case MODIFIER_CHANGE:
                                    TimescaleColumn columnWithChangedModifier = getTimescaleColumn(newSchema, newColumn, newDescriptorName);
                                    if (newColumn instanceof StaticDataField) {
                                        String newDefaultVal = action.getDataTransformation().getDefaultValue().toString();
                                        handleChangeDefaultValueOperation(schemaName, columnWithChangedModifier, newDefaultVal);
                                    } else {
                                        if (action.hasDataTransformation()) {
                                            if (action.getDataTransformation().hasDefaultValue()) {
                                                String newDefaultVal = action.getDataTransformation().getDefaultValue().toString();
                                                handleChangeDefaultValueOperation(schemaName, columnWithChangedModifier, newDefaultVal);
                                            }
                                        }
                                    }
                                    break;
                                case DESCRIPTION_CHANGE:
                                case PRIMARY_KEY_CHANGE:
                                case TITLE_CHANGE:
                                case ORDINAL_CHANGE:
                                case RELATION_CHANGE:
                                    LOG.debug().append("Ignore field change action: ").append(fieldChangeType).commit();
                                    break;
                                default:
                                    LOG.warn().append("Unsupported field change action: ").append(fieldChangeType).commit();
                                    break;
                            }
                        }
                        break;
                    default:
                        LOG.warn().append("There is no implementation for handling change type: ").append(changeType).commit();
                        break;
                }
            }
        });

        MigrationMetadata migrationMetadata = migrationMetadataService.getByStreamName(schemaName).get();
        migrationMetadata.setVersion(migrationVersion);
        migrationMetadata.setIsSuccess(Boolean.TRUE);
        migrationMetadata.setDateTime(System.currentTimeMillis());

        migrationMetadataService.update(migrationMetadata);
    }