in flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java [757:1071]
private TableResult executeOperation(Operation operation) {
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
if (createTableOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
if (dropTableOperation.isTemporary()) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier(),
dropTableOperation.isIfExists());
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(),
dropTableOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof AlterTableOperation) {
AlterTableOperation alterTableOperation = (AlterTableOperation) operation;
Catalog catalog = getCatalogOrThrowException(alterTableOperation.getTableIdentifier().getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(alterTableOperation.asSummaryString());
try {
if (alterTableOperation instanceof AlterTableRenameOperation) {
AlterTableRenameOperation alterTableRenameOp = (AlterTableRenameOperation) operation;
catalog.renameTable(
alterTableRenameOp.getTableIdentifier().toObjectPath(),
alterTableRenameOp.getNewTableIdentifier().getObjectName(),
false);
} else if (alterTableOperation instanceof AlterTablePropertiesOperation) {
AlterTablePropertiesOperation alterTablePropertiesOp = (AlterTablePropertiesOperation) operation;
catalog.alterTable(
alterTablePropertiesOp.getTableIdentifier().toObjectPath(),
alterTablePropertiesOp.getCatalogTable(),
false);
} else if (alterTableOperation instanceof AlterTableAddConstraintOperation){
AlterTableAddConstraintOperation addConstraintOP =
(AlterTableAddConstraintOperation) operation;
CatalogTable oriTable = (CatalogTable) catalogManager
.getTable(addConstraintOP.getTableIdentifier())
.get()
.getTable();
TableSchema.Builder builder = TableSchemaUtils
.builderWithGivenSchema(oriTable.getSchema());
if (addConstraintOP.getConstraintName().isPresent()) {
builder.primaryKey(
addConstraintOP.getConstraintName().get(),
addConstraintOP.getColumnNames());
} else {
builder.primaryKey(addConstraintOP.getColumnNames());
}
CatalogTable newTable = new CatalogTableImpl(
builder.build(),
oriTable.getPartitionKeys(),
oriTable.getOptions(),
oriTable.getComment());
catalog.alterTable(
addConstraintOP.getTableIdentifier().toObjectPath(),
newTable,
false);
} else if (alterTableOperation instanceof AlterTableDropConstraintOperation){
AlterTableDropConstraintOperation dropConstraintOperation =
(AlterTableDropConstraintOperation) operation;
CatalogTable oriTable = (CatalogTable) catalogManager
.getTable(dropConstraintOperation.getTableIdentifier())
.get()
.getTable();
CatalogTable newTable = new CatalogTableImpl(
TableSchemaUtils.dropConstraint(
oriTable.getSchema(),
dropConstraintOperation.getConstraintName()),
oriTable.getPartitionKeys(),
oriTable.getOptions(),
oriTable.getComment());
catalog.alterTable(
dropConstraintOperation.getTableIdentifier().toObjectPath(),
newTable,
false);
} else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
AlterPartitionPropertiesOperation alterPartPropsOp = (AlterPartitionPropertiesOperation) operation;
catalog.alterPartition(alterPartPropsOp.getTableIdentifier().toObjectPath(),
alterPartPropsOp.getPartitionSpec(),
alterPartPropsOp.getCatalogPartition(),
false);
} else if (alterTableOperation instanceof AlterTableSchemaOperation) {
AlterTableSchemaOperation alterTableSchemaOperation = (AlterTableSchemaOperation) alterTableOperation;
catalog.alterTable(alterTableSchemaOperation.getTableIdentifier().toObjectPath(),
alterTableSchemaOperation.getCatalogTable(),
false);
} else if (alterTableOperation instanceof AddPartitionsOperation) {
AddPartitionsOperation addPartitionsOperation = (AddPartitionsOperation) alterTableOperation;
List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();
List<CatalogPartition> partitions = addPartitionsOperation.getCatalogPartitions();
boolean ifNotExists = addPartitionsOperation.ifNotExists();
ObjectPath tablePath = addPartitionsOperation.getTableIdentifier().toObjectPath();
for (int i = 0; i < specs.size(); i++) {
catalog.createPartition(tablePath, specs.get(i), partitions.get(i), ifNotExists);
}
} else if (alterTableOperation instanceof DropPartitionsOperation) {
DropPartitionsOperation dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation;
ObjectPath tablePath = dropPartitionsOperation.getTableIdentifier().toObjectPath();
boolean ifExists = dropPartitionsOperation.ifExists();
for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
catalog.dropPartition(tablePath, spec, ifExists);
}
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
if (createViewOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createViewOperation.getCatalogView(),
createViewOperation.getViewIdentifier(),
createViewOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createViewOperation.getCatalogView(),
createViewOperation.getViewIdentifier(),
createViewOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropViewOperation) {
DropViewOperation dropViewOperation = (DropViewOperation) operation;
if (dropViewOperation.isTemporary()) {
catalogManager.dropTemporaryView(
dropViewOperation.getViewIdentifier(),
dropViewOperation.isIfExists());
} else {
catalogManager.dropView(
dropViewOperation.getViewIdentifier(),
dropViewOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof AlterViewOperation) {
AlterViewOperation alterViewOperation = (AlterViewOperation) operation;
Catalog catalog = getCatalogOrThrowException(alterViewOperation.getViewIdentifier().getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(alterViewOperation.asSummaryString());
try {
if (alterViewOperation instanceof AlterViewRenameOperation) {
AlterViewRenameOperation alterTableRenameOp = (AlterViewRenameOperation) operation;
catalog.renameTable(
alterTableRenameOp.getViewIdentifier().toObjectPath(),
alterTableRenameOp.getNewViewIdentifier().getObjectName(),
false);
} else if (alterViewOperation instanceof AlterViewPropertiesOperation) {
AlterViewPropertiesOperation alterTablePropertiesOp = (AlterViewPropertiesOperation) operation;
catalog.alterTable(
alterTablePropertiesOp.getViewIdentifier().toObjectPath(),
alterTablePropertiesOp.getCatalogView(),
false);
} else if (alterViewOperation instanceof AlterViewAsOperation) {
AlterViewAsOperation alterViewAsOperation = (AlterViewAsOperation) alterViewOperation;
catalog.alterTable(alterViewAsOperation.getViewIdentifier().toObjectPath(),
alterViewAsOperation.getNewView(),
false);
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof CreateDatabaseOperation) {
CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString());
try {
catalog.createDatabase(
createDatabaseOperation.getDatabaseName(),
createDatabaseOperation.getCatalogDatabase(),
createDatabaseOperation.isIgnoreIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseAlreadyExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof DropDatabaseOperation) {
DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString());
try {
catalog.dropDatabase(
dropDatabaseOperation.getDatabaseName(),
dropDatabaseOperation.isIfExists(),
dropDatabaseOperation.isCascade());
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof AlterDatabaseOperation) {
AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString());
try {
catalog.alterDatabase(
alterDatabaseOperation.getDatabaseName(),
alterDatabaseOperation.getCatalogDatabase(),
false);
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseNotExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof CreateCatalogFunctionOperation) {
return createCatalogFunction((CreateCatalogFunctionOperation) operation);
} else if (operation instanceof CreateTempSystemFunctionOperation) {
return createSystemFunction((CreateTempSystemFunctionOperation) operation);
} else if (operation instanceof DropCatalogFunctionOperation) {
return dropCatalogFunction((DropCatalogFunctionOperation) operation);
} else if (operation instanceof DropTempSystemFunctionOperation) {
return dropSystemFunction((DropTempSystemFunctionOperation) operation);
} else if (operation instanceof AlterCatalogFunctionOperation) {
return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
} else if (operation instanceof CreateCatalogOperation) {
return createCatalog((CreateCatalogOperation) operation);
} else if (operation instanceof DropCatalogOperation) {
DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
try {
catalogManager.unregisterCatalog(dropCatalogOperation.getCatalogName(),
dropCatalogOperation.isIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(exMsg, e);
}
} else if (operation instanceof UseCatalogOperation) {
UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof UseDatabaseOperation) {
UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) operation;
catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName());
catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName());
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof ShowCatalogsOperation) {
return buildShowResult("catalog name", listCatalogs());
} else if (operation instanceof ShowCurrentCatalogOperation){
return buildShowResult("current catalog name", new String[]{catalogManager.getCurrentCatalog()});
} else if (operation instanceof ShowDatabasesOperation) {
return buildShowResult("database name", listDatabases());
} else if (operation instanceof ShowCurrentDatabaseOperation) {
return buildShowResult("current database name", new String[]{catalogManager.getCurrentDatabase()});
} else if (operation instanceof ShowTablesOperation) {
return buildShowResult("table name", listTables());
} else if (operation instanceof ShowFunctionsOperation) {
return buildShowResult("function name", listFunctions());
} else if (operation instanceof ShowViewsOperation) {
return buildShowResult("view name", listViews());
} else if (operation instanceof ShowPartitionsOperation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
Catalog catalog = getCatalogOrThrowException(showPartitionsOperation.getTableIdentifier().getCatalogName());
ObjectPath tablePath = showPartitionsOperation.getTableIdentifier().toObjectPath();
CatalogPartitionSpec partitionSpec = showPartitionsOperation.getPartitionSpec();
List<CatalogPartitionSpec> partitionSpecs = partitionSpec == null ? catalog.listPartitions(tablePath) : catalog.listPartitions(tablePath, partitionSpec);
List<String> partitionNames = new ArrayList<>(partitionSpecs.size());
for (CatalogPartitionSpec spec: partitionSpecs) {
List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size());
for (Map.Entry<String, String> partitionKV: spec.getPartitionSpec().entrySet()) {
partitionKVs.add(partitionKV.getKey() + "=" + partitionKV.getValue());
}
partitionNames.add(String.join("/", partitionKVs));
}
return buildShowResult("partition name", partitionNames.toArray(new String[0]));
} catch (TableNotExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof ExplainOperation) {
String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()));
return TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of(explanation)))
.setPrintStyle(TableResultImpl.PrintStyle.rawContent())
.build();
} else if (operation instanceof DescribeTableOperation) {
DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
Optional<CatalogManager.TableLookupResult> result =
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
if (result.isPresent()) {
return buildDescribeResult(result.get().getResolvedSchema());
} else {
throw new ValidationException(String.format(
"Tables or views with the identifier '%s' doesn't exist",
describeTableOperation.getSqlIdentifier().asSummaryString()));
}
} else if (operation instanceof QueryOperation) {
return executeInternal((QueryOperation) operation);
} else {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
}