in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala [636:805]
private def executeOperation(operation: Operation): TableResult = {
operation match {
case catalogSinkModifyOperation: CatalogSinkModifyOperation =>
executeInternal(JCollections.singletonList[ModifyOperation](catalogSinkModifyOperation))
case createTableOperation: CreateTableOperation =>
if (createTableOperation.isTemporary) {
catalogManager.createTemporaryTable(
createTableOperation.getCatalogTable,
createTableOperation.getTableIdentifier,
createTableOperation.isIgnoreIfExists
)
} else {
catalogManager.createTable(
createTableOperation.getCatalogTable,
createTableOperation.getTableIdentifier,
createTableOperation.isIgnoreIfExists)
}
TableResultImpl.TABLE_RESULT_OK
case dropTableOperation: DropTableOperation =>
if (dropTableOperation.isTemporary) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier,
dropTableOperation.isIfExists)
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier,
dropTableOperation.isIfExists)
}
TableResultImpl.TABLE_RESULT_OK
case alterTableOperation: AlterTableOperation =>
val catalog = getCatalogOrThrowException(
alterTableOperation.getTableIdentifier.getCatalogName)
val exMsg = getDDLOpExecuteErrorMsg(alterTableOperation.asSummaryString)
try {
alterTableOperation match {
case alterTableRenameOp: AlterTableRenameOperation =>
catalog.renameTable(
alterTableRenameOp.getTableIdentifier.toObjectPath,
alterTableRenameOp.getNewTableIdentifier.getObjectName,
false)
case alterTablePropertiesOp: AlterTablePropertiesOperation =>
catalog.alterTable(
alterTablePropertiesOp.getTableIdentifier.toObjectPath,
alterTablePropertiesOp.getCatalogTable,
false)
}
TableResultImpl.TABLE_RESULT_OK
} catch {
case ex: TableNotExistException => throw new ValidationException(exMsg, ex)
case ex: Exception => throw new TableException(exMsg, ex)
}
case createDatabaseOperation: CreateDatabaseOperation =>
val catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName)
val exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString)
try {
catalog.createDatabase(
createDatabaseOperation.getDatabaseName,
createDatabaseOperation.getCatalogDatabase,
createDatabaseOperation.isIgnoreIfExists)
TableResultImpl.TABLE_RESULT_OK
} catch {
case ex: DatabaseAlreadyExistException => throw new ValidationException(exMsg, ex)
case ex: Exception => throw new TableException(exMsg, ex)
}
case dropDatabaseOperation: DropDatabaseOperation =>
val catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName)
val exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString)
try {
catalog.dropDatabase(
dropDatabaseOperation.getDatabaseName,
dropDatabaseOperation.isIfExists,
dropDatabaseOperation.isCascade)
TableResultImpl.TABLE_RESULT_OK
} catch {
case ex: DatabaseNotEmptyException => throw new ValidationException(exMsg, ex)
case ex: DatabaseNotExistException => throw new ValidationException(exMsg, ex)
case ex: Exception => throw new TableException(exMsg, ex)
}
case alterDatabaseOperation: AlterDatabaseOperation =>
val catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName)
val exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString)
try {
catalog.alterDatabase(
alterDatabaseOperation.getDatabaseName,
alterDatabaseOperation.getCatalogDatabase,
false)
TableResultImpl.TABLE_RESULT_OK
} catch {
case ex: DatabaseNotExistException => throw new ValidationException(exMsg, ex)
case ex: Exception => throw new TableException(exMsg, ex)
}
case createFunctionOperation: CreateCatalogFunctionOperation =>
createCatalogFunction(createFunctionOperation)
case createTempSystemFunctionOperation: CreateTempSystemFunctionOperation =>
createSystemFunction(createTempSystemFunctionOperation)
case dropFunctionOperation: DropCatalogFunctionOperation =>
dropCatalogFunction(dropFunctionOperation)
case dropTempSystemFunctionOperation: DropTempSystemFunctionOperation =>
dropSystemFunction(dropTempSystemFunctionOperation)
case alterFunctionOperation: AlterCatalogFunctionOperation =>
alterCatalogFunction(alterFunctionOperation)
case useCatalogOperation: UseCatalogOperation =>
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName)
TableResultImpl.TABLE_RESULT_OK
case useDatabaseOperation: UseDatabaseOperation =>
catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName)
catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName)
TableResultImpl.TABLE_RESULT_OK
case _: ShowCatalogsOperation =>
buildShowResult("catalog name", listCatalogs())
case _: ShowCurrentCatalogOperation =>
buildShowResult("current catalog name", Array(catalogManager.getCurrentCatalog))
case _: ShowDatabasesOperation =>
buildShowResult("database name", listDatabases())
case _: ShowCurrentDatabaseOperation =>
buildShowResult("current database name", Array(catalogManager.getCurrentDatabase))
case _: ShowTablesOperation =>
buildShowResult("table name", listTables())
case _: ShowFunctionsOperation =>
buildShowResult("function name", listFunctions())
case createViewOperation: CreateViewOperation =>
if (createViewOperation.isTemporary) {
catalogManager.createTemporaryTable(
createViewOperation.getCatalogView,
createViewOperation.getViewIdentifier,
createViewOperation.isIgnoreIfExists)
} else {
catalogManager.createTable(
createViewOperation.getCatalogView,
createViewOperation.getViewIdentifier,
createViewOperation.isIgnoreIfExists)
}
TableResultImpl.TABLE_RESULT_OK
case dropViewOperation: DropViewOperation =>
if (dropViewOperation.isTemporary) {
catalogManager.dropTemporaryView(
dropViewOperation.getViewIdentifier,
dropViewOperation.isIfExists)
} else {
catalogManager.dropView(
dropViewOperation.getViewIdentifier,
dropViewOperation.isIfExists)
}
TableResultImpl.TABLE_RESULT_OK
case _: ShowViewsOperation =>
buildShowResult("view name", listViews())
case explainOperation: ExplainOperation =>
val explanation = explainInternal(JCollections.singletonList(explainOperation.getChild))
TableResultImpl.builder.
resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
.data(JCollections.singletonList(Row.of(explanation)))
.setPrintStyle(PrintStyle.rawContent())
.build
case descOperation: DescribeTableOperation =>
val result = catalogManager.getTable(descOperation.getSqlIdentifier)
if (result.isPresent) {
buildDescribeResult(result.get.getTable.getSchema)
} else {
throw new ValidationException(String.format(
"Table or view with identifier '%s' doesn't exist",
descOperation.getSqlIdentifier.asSummaryString()))
}
case queryOperation: QueryOperation =>
executeInternal(queryOperation)
case _ =>
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
}
}