private def executeOperation()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala [636:805]


  private def executeOperation(operation: Operation): TableResult = {
    operation match {
      case catalogSinkModifyOperation: CatalogSinkModifyOperation =>
        executeInternal(JCollections.singletonList[ModifyOperation](catalogSinkModifyOperation))
      case createTableOperation: CreateTableOperation =>
        if (createTableOperation.isTemporary) {
          catalogManager.createTemporaryTable(
            createTableOperation.getCatalogTable,
            createTableOperation.getTableIdentifier,
            createTableOperation.isIgnoreIfExists
          )
        } else {
          catalogManager.createTable(
            createTableOperation.getCatalogTable,
            createTableOperation.getTableIdentifier,
            createTableOperation.isIgnoreIfExists)
        }
        TableResultImpl.TABLE_RESULT_OK
      case dropTableOperation: DropTableOperation =>
        if (dropTableOperation.isTemporary) {
          catalogManager.dropTemporaryTable(
            dropTableOperation.getTableIdentifier,
            dropTableOperation.isIfExists)
        } else {
          catalogManager.dropTable(
            dropTableOperation.getTableIdentifier,
            dropTableOperation.isIfExists)
        }
        TableResultImpl.TABLE_RESULT_OK
      case alterTableOperation: AlterTableOperation =>
        val catalog = getCatalogOrThrowException(
          alterTableOperation.getTableIdentifier.getCatalogName)
        val exMsg = getDDLOpExecuteErrorMsg(alterTableOperation.asSummaryString)
        try {
          alterTableOperation match {
            case alterTableRenameOp: AlterTableRenameOperation =>
              catalog.renameTable(
                alterTableRenameOp.getTableIdentifier.toObjectPath,
                alterTableRenameOp.getNewTableIdentifier.getObjectName,
                false)
            case alterTablePropertiesOp: AlterTablePropertiesOperation =>
              catalog.alterTable(
                alterTablePropertiesOp.getTableIdentifier.toObjectPath,
                alterTablePropertiesOp.getCatalogTable,
                false)
          }
          TableResultImpl.TABLE_RESULT_OK
        } catch {
          case ex: TableNotExistException => throw new ValidationException(exMsg, ex)
          case ex: Exception => throw new TableException(exMsg, ex)
        }
      case createDatabaseOperation: CreateDatabaseOperation =>
        val catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName)
        val exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString)
        try {
          catalog.createDatabase(
            createDatabaseOperation.getDatabaseName,
            createDatabaseOperation.getCatalogDatabase,
            createDatabaseOperation.isIgnoreIfExists)
          TableResultImpl.TABLE_RESULT_OK
        } catch {
          case ex: DatabaseAlreadyExistException => throw new ValidationException(exMsg, ex)
          case ex: Exception => throw new TableException(exMsg, ex)
        }
      case dropDatabaseOperation: DropDatabaseOperation =>
        val catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName)
        val exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString)
        try {
          catalog.dropDatabase(
            dropDatabaseOperation.getDatabaseName,
            dropDatabaseOperation.isIfExists,
            dropDatabaseOperation.isCascade)
          TableResultImpl.TABLE_RESULT_OK
        } catch {
          case ex: DatabaseNotEmptyException => throw new ValidationException(exMsg, ex)
          case ex: DatabaseNotExistException => throw new ValidationException(exMsg, ex)
          case ex: Exception => throw new TableException(exMsg, ex)
        }
      case alterDatabaseOperation: AlterDatabaseOperation =>
        val catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName)
        val exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString)
        try {
          catalog.alterDatabase(
            alterDatabaseOperation.getDatabaseName,
            alterDatabaseOperation.getCatalogDatabase,
            false)
          TableResultImpl.TABLE_RESULT_OK
        } catch {
          case ex: DatabaseNotExistException => throw new ValidationException(exMsg, ex)
          case ex: Exception => throw new TableException(exMsg, ex)
        }
      case createFunctionOperation: CreateCatalogFunctionOperation =>
        createCatalogFunction(createFunctionOperation)
      case createTempSystemFunctionOperation: CreateTempSystemFunctionOperation =>
        createSystemFunction(createTempSystemFunctionOperation)
      case dropFunctionOperation: DropCatalogFunctionOperation =>
        dropCatalogFunction(dropFunctionOperation)
      case dropTempSystemFunctionOperation: DropTempSystemFunctionOperation =>
        dropSystemFunction(dropTempSystemFunctionOperation)
      case alterFunctionOperation: AlterCatalogFunctionOperation =>
        alterCatalogFunction(alterFunctionOperation)
      case useCatalogOperation: UseCatalogOperation =>
        catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName)
        TableResultImpl.TABLE_RESULT_OK
      case useDatabaseOperation: UseDatabaseOperation =>
        catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName)
        catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName)
        TableResultImpl.TABLE_RESULT_OK
      case _: ShowCatalogsOperation =>
        buildShowResult("catalog name", listCatalogs())
      case _: ShowCurrentCatalogOperation =>
        buildShowResult("current catalog name", Array(catalogManager.getCurrentCatalog))
      case _: ShowDatabasesOperation =>
        buildShowResult("database name", listDatabases())
      case _: ShowCurrentDatabaseOperation =>
        buildShowResult("current database name", Array(catalogManager.getCurrentDatabase))
      case _: ShowTablesOperation =>
        buildShowResult("table name", listTables())
      case _: ShowFunctionsOperation =>
        buildShowResult("function name", listFunctions())
      case createViewOperation: CreateViewOperation =>
        if (createViewOperation.isTemporary) {
          catalogManager.createTemporaryTable(
            createViewOperation.getCatalogView,
            createViewOperation.getViewIdentifier,
            createViewOperation.isIgnoreIfExists)
        } else {
          catalogManager.createTable(
            createViewOperation.getCatalogView,
            createViewOperation.getViewIdentifier,
            createViewOperation.isIgnoreIfExists)
        }
        TableResultImpl.TABLE_RESULT_OK
      case dropViewOperation: DropViewOperation =>
        if (dropViewOperation.isTemporary) {
          catalogManager.dropTemporaryView(
            dropViewOperation.getViewIdentifier,
            dropViewOperation.isIfExists)
        } else {
          catalogManager.dropView(
            dropViewOperation.getViewIdentifier,
            dropViewOperation.isIfExists)
        }
        TableResultImpl.TABLE_RESULT_OK
      case _: ShowViewsOperation =>
        buildShowResult("view name", listViews())
      case explainOperation: ExplainOperation =>
        val explanation = explainInternal(JCollections.singletonList(explainOperation.getChild))
        TableResultImpl.builder.
          resultKind(ResultKind.SUCCESS_WITH_CONTENT)
          .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build)
          .data(JCollections.singletonList(Row.of(explanation)))
          .setPrintStyle(PrintStyle.rawContent())
          .build
      case descOperation: DescribeTableOperation =>
        val result = catalogManager.getTable(descOperation.getSqlIdentifier)
        if (result.isPresent) {
          buildDescribeResult(result.get.getTable.getSchema)
        } else {
          throw new ValidationException(String.format(
            "Table or view with identifier '%s' doesn't exist",
            descOperation.getSqlIdentifier.asSummaryString()))
        }
      case queryOperation: QueryOperation =>
        executeInternal(queryOperation)

      case _ =>
        throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
    }
  }