private def createFlinkAggFunction()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala [1541:1796]


  private def createFlinkAggFunction(
      aggFunc: SqlAggFunction,
      needRetraction: Boolean,
      inputDataType: Seq[RelDataType],
      tableConfig: TableConfig)
    : ImperativeAggregateFunction[_ <: Any, _ <: Any] = {

    lazy val outputType = inputDataType.get(0)
    lazy val outputTypeName = if (inputDataType.isEmpty) {
      throw new TableException("Aggregate fields should not be empty.")
    } else {
      outputType.getSqlTypeName
    }

    aggFunc match {

      case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
        new CollectAggFunction(FlinkTypeFactory.toTypeInfo(outputType))

      case udagg: AggSqlFunction =>
        udagg.getFunction

      case _: SqlCountAggFunction =>
        new CountAggFunction

      case _: SqlSumAggFunction =>
        if (needRetraction) {
          outputTypeName match {
            case TINYINT =>
              new ByteSumWithRetractAggFunction
            case SMALLINT =>
              new ShortSumWithRetractAggFunction
            case INTEGER =>
              new IntSumWithRetractAggFunction
            case BIGINT =>
              new LongSumWithRetractAggFunction
            case FLOAT =>
              new FloatSumWithRetractAggFunction
            case DOUBLE =>
              new DoubleSumWithRetractAggFunction
            case DECIMAL =>
              new DecimalSumWithRetractAggFunction
            case sqlType: SqlTypeName =>
              throw new TableException(s"Sum aggregate does no support type: '$sqlType'")
          }
        } else {
          outputTypeName match {
            case TINYINT =>
              new ByteSumAggFunction
            case SMALLINT =>
              new ShortSumAggFunction
            case INTEGER =>
              new IntSumAggFunction
            case BIGINT =>
              new LongSumAggFunction
            case FLOAT =>
              new FloatSumAggFunction
            case DOUBLE =>
              new DoubleSumAggFunction
            case DECIMAL =>
              new DecimalSumAggFunction
            case sqlType: SqlTypeName =>
              throw new TableException(s"Sum aggregate does no support type: '$sqlType'")
          }
        }

      case _: SqlSumEmptyIsZeroAggFunction =>
        if (needRetraction) {
          outputTypeName match {
            case TINYINT =>
              new ByteSum0WithRetractAggFunction
            case SMALLINT =>
              new ShortSum0WithRetractAggFunction
            case INTEGER =>
              new IntSum0WithRetractAggFunction
            case BIGINT =>
              new LongSum0WithRetractAggFunction
            case FLOAT =>
              new FloatSum0WithRetractAggFunction
            case DOUBLE =>
              new DoubleSum0WithRetractAggFunction
            case DECIMAL =>
              new DecimalSum0WithRetractAggFunction
            case sqlType: SqlTypeName =>
              throw new TableException(s"Sum0 aggregate does no support type: '$sqlType'")
          }
        } else {
          outputTypeName match {
            case TINYINT =>
              new ByteSum0AggFunction
            case SMALLINT =>
              new ShortSum0AggFunction
            case INTEGER =>
              new IntSum0AggFunction
            case BIGINT =>
              new LongSum0AggFunction
            case FLOAT =>
              new FloatSum0AggFunction
            case DOUBLE =>
              new DoubleSum0AggFunction
            case DECIMAL =>
              new DecimalSum0AggFunction
            case sqlType: SqlTypeName =>
              throw new TableException(s"Sum0 aggregate does no support type: '$sqlType'")
          }
        }

      case a: SqlAvgAggFunction if a.kind == SqlKind.AVG =>
        outputTypeName match {
          case TINYINT =>
            new ByteAvgAggFunction
          case SMALLINT =>
            new ShortAvgAggFunction
          case INTEGER =>
            new IntAvgAggFunction
          case BIGINT =>
            new LongAvgAggFunction
          case FLOAT =>
            new FloatAvgAggFunction
          case DOUBLE =>
            new DoubleAvgAggFunction
          case DECIMAL =>
            new DecimalAvgAggFunction(tableConfig.getDecimalContext)
          case sqlType: SqlTypeName =>
            throw new TableException(s"Avg aggregate does no support type: '$sqlType'")
        }

      case sqlMinMaxFunction: SqlMinMaxAggFunction =>
        if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
          if (needRetraction) {
            outputTypeName match {
              case TINYINT =>
                new ByteMinWithRetractAggFunction
              case SMALLINT =>
                new ShortMinWithRetractAggFunction
              case INTEGER =>
                new IntMinWithRetractAggFunction
              case BIGINT =>
                new LongMinWithRetractAggFunction
              case FLOAT =>
                new FloatMinWithRetractAggFunction
              case DOUBLE =>
                new DoubleMinWithRetractAggFunction
              case DECIMAL =>
                new DecimalMinWithRetractAggFunction
              case BOOLEAN =>
                new BooleanMinWithRetractAggFunction
              case VARCHAR | CHAR =>
                new StringMinWithRetractAggFunction
              case TIMESTAMP =>
                new TimestampMinWithRetractAggFunction
              case DATE =>
                new DateMinWithRetractAggFunction
              case TIME =>
                new TimeMinWithRetractAggFunction
              case sqlType: SqlTypeName =>
                throw new TableException(
                  s"Min with retract aggregate does no support type: '$sqlType'")
            }
          } else {
            outputTypeName match {
              case TINYINT =>
                new ByteMinAggFunction
              case SMALLINT =>
                new ShortMinAggFunction
              case INTEGER =>
                new IntMinAggFunction
              case BIGINT =>
                new LongMinAggFunction
              case FLOAT =>
                new FloatMinAggFunction
              case DOUBLE =>
                new DoubleMinAggFunction
              case DECIMAL =>
                new DecimalMinAggFunction
              case BOOLEAN =>
                new BooleanMinAggFunction
              case VARCHAR | CHAR =>
                new StringMinAggFunction
              case TIMESTAMP =>
                new TimestampMinAggFunction
              case DATE =>
                new DateMinAggFunction
              case TIME =>
                new TimeMinAggFunction
              case sqlType: SqlTypeName =>
                throw new TableException(s"Min aggregate does no support type: '$sqlType'")
            }
          }
        } else {
          if (needRetraction) {
            outputTypeName match {
              case TINYINT =>
                new ByteMaxWithRetractAggFunction
              case SMALLINT =>
                new ShortMaxWithRetractAggFunction
              case INTEGER =>
                new IntMaxWithRetractAggFunction
              case BIGINT =>
                new LongMaxWithRetractAggFunction
              case FLOAT =>
                new FloatMaxWithRetractAggFunction
              case DOUBLE =>
                new DoubleMaxWithRetractAggFunction
              case DECIMAL =>
                new DecimalMaxWithRetractAggFunction
              case BOOLEAN =>
                new BooleanMaxWithRetractAggFunction
              case VARCHAR | CHAR =>
                new StringMaxWithRetractAggFunction
              case TIMESTAMP =>
                new TimestampMaxWithRetractAggFunction
              case DATE =>
                new DateMaxWithRetractAggFunction
              case TIME =>
                new TimeMaxWithRetractAggFunction
              case sqlType: SqlTypeName =>
                throw new TableException(
                  s"Max with retract aggregate does no support type: '$sqlType'")
            }
          } else {
            outputTypeName match {
              case TINYINT =>
                new ByteMaxAggFunction
              case SMALLINT =>
                new ShortMaxAggFunction
              case INTEGER =>
                new IntMaxAggFunction
              case BIGINT =>
                new LongMaxAggFunction
              case FLOAT =>
                new FloatMaxAggFunction
              case DOUBLE =>
                new DoubleMaxAggFunction
              case DECIMAL =>
                new DecimalMaxAggFunction
              case BOOLEAN =>
                new BooleanMaxAggFunction
              case VARCHAR | CHAR =>
                new StringMaxAggFunction
              case TIMESTAMP =>
                new TimestampMaxAggFunction
              case DATE =>
                new DateMaxAggFunction
              case TIME =>
                new TimeMaxAggFunction
              case sqlType: SqlTypeName =>
                throw new TableException(s"Max aggregate does no support type: '$sqlType'")
            }
          }
        }

      case unSupported: SqlAggFunction =>
        throw new TableException(s"Unsupported Function: '${unSupported.getName}'")
    }
  }