in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala [1541:1796]
private def createFlinkAggFunction(
aggFunc: SqlAggFunction,
needRetraction: Boolean,
inputDataType: Seq[RelDataType],
tableConfig: TableConfig)
: ImperativeAggregateFunction[_ <: Any, _ <: Any] = {
lazy val outputType = inputDataType.get(0)
lazy val outputTypeName = if (inputDataType.isEmpty) {
throw new TableException("Aggregate fields should not be empty.")
} else {
outputType.getSqlTypeName
}
aggFunc match {
case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
new CollectAggFunction(FlinkTypeFactory.toTypeInfo(outputType))
case udagg: AggSqlFunction =>
udagg.getFunction
case _: SqlCountAggFunction =>
new CountAggFunction
case _: SqlSumAggFunction =>
if (needRetraction) {
outputTypeName match {
case TINYINT =>
new ByteSumWithRetractAggFunction
case SMALLINT =>
new ShortSumWithRetractAggFunction
case INTEGER =>
new IntSumWithRetractAggFunction
case BIGINT =>
new LongSumWithRetractAggFunction
case FLOAT =>
new FloatSumWithRetractAggFunction
case DOUBLE =>
new DoubleSumWithRetractAggFunction
case DECIMAL =>
new DecimalSumWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Sum aggregate does no support type: '$sqlType'")
}
} else {
outputTypeName match {
case TINYINT =>
new ByteSumAggFunction
case SMALLINT =>
new ShortSumAggFunction
case INTEGER =>
new IntSumAggFunction
case BIGINT =>
new LongSumAggFunction
case FLOAT =>
new FloatSumAggFunction
case DOUBLE =>
new DoubleSumAggFunction
case DECIMAL =>
new DecimalSumAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Sum aggregate does no support type: '$sqlType'")
}
}
case _: SqlSumEmptyIsZeroAggFunction =>
if (needRetraction) {
outputTypeName match {
case TINYINT =>
new ByteSum0WithRetractAggFunction
case SMALLINT =>
new ShortSum0WithRetractAggFunction
case INTEGER =>
new IntSum0WithRetractAggFunction
case BIGINT =>
new LongSum0WithRetractAggFunction
case FLOAT =>
new FloatSum0WithRetractAggFunction
case DOUBLE =>
new DoubleSum0WithRetractAggFunction
case DECIMAL =>
new DecimalSum0WithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Sum0 aggregate does no support type: '$sqlType'")
}
} else {
outputTypeName match {
case TINYINT =>
new ByteSum0AggFunction
case SMALLINT =>
new ShortSum0AggFunction
case INTEGER =>
new IntSum0AggFunction
case BIGINT =>
new LongSum0AggFunction
case FLOAT =>
new FloatSum0AggFunction
case DOUBLE =>
new DoubleSum0AggFunction
case DECIMAL =>
new DecimalSum0AggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Sum0 aggregate does no support type: '$sqlType'")
}
}
case a: SqlAvgAggFunction if a.kind == SqlKind.AVG =>
outputTypeName match {
case TINYINT =>
new ByteAvgAggFunction
case SMALLINT =>
new ShortAvgAggFunction
case INTEGER =>
new IntAvgAggFunction
case BIGINT =>
new LongAvgAggFunction
case FLOAT =>
new FloatAvgAggFunction
case DOUBLE =>
new DoubleAvgAggFunction
case DECIMAL =>
new DecimalAvgAggFunction(tableConfig.getDecimalContext)
case sqlType: SqlTypeName =>
throw new TableException(s"Avg aggregate does no support type: '$sqlType'")
}
case sqlMinMaxFunction: SqlMinMaxAggFunction =>
if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
if (needRetraction) {
outputTypeName match {
case TINYINT =>
new ByteMinWithRetractAggFunction
case SMALLINT =>
new ShortMinWithRetractAggFunction
case INTEGER =>
new IntMinWithRetractAggFunction
case BIGINT =>
new LongMinWithRetractAggFunction
case FLOAT =>
new FloatMinWithRetractAggFunction
case DOUBLE =>
new DoubleMinWithRetractAggFunction
case DECIMAL =>
new DecimalMinWithRetractAggFunction
case BOOLEAN =>
new BooleanMinWithRetractAggFunction
case VARCHAR | CHAR =>
new StringMinWithRetractAggFunction
case TIMESTAMP =>
new TimestampMinWithRetractAggFunction
case DATE =>
new DateMinWithRetractAggFunction
case TIME =>
new TimeMinWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Min with retract aggregate does no support type: '$sqlType'")
}
} else {
outputTypeName match {
case TINYINT =>
new ByteMinAggFunction
case SMALLINT =>
new ShortMinAggFunction
case INTEGER =>
new IntMinAggFunction
case BIGINT =>
new LongMinAggFunction
case FLOAT =>
new FloatMinAggFunction
case DOUBLE =>
new DoubleMinAggFunction
case DECIMAL =>
new DecimalMinAggFunction
case BOOLEAN =>
new BooleanMinAggFunction
case VARCHAR | CHAR =>
new StringMinAggFunction
case TIMESTAMP =>
new TimestampMinAggFunction
case DATE =>
new DateMinAggFunction
case TIME =>
new TimeMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Min aggregate does no support type: '$sqlType'")
}
}
} else {
if (needRetraction) {
outputTypeName match {
case TINYINT =>
new ByteMaxWithRetractAggFunction
case SMALLINT =>
new ShortMaxWithRetractAggFunction
case INTEGER =>
new IntMaxWithRetractAggFunction
case BIGINT =>
new LongMaxWithRetractAggFunction
case FLOAT =>
new FloatMaxWithRetractAggFunction
case DOUBLE =>
new DoubleMaxWithRetractAggFunction
case DECIMAL =>
new DecimalMaxWithRetractAggFunction
case BOOLEAN =>
new BooleanMaxWithRetractAggFunction
case VARCHAR | CHAR =>
new StringMaxWithRetractAggFunction
case TIMESTAMP =>
new TimestampMaxWithRetractAggFunction
case DATE =>
new DateMaxWithRetractAggFunction
case TIME =>
new TimeMaxWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Max with retract aggregate does no support type: '$sqlType'")
}
} else {
outputTypeName match {
case TINYINT =>
new ByteMaxAggFunction
case SMALLINT =>
new ShortMaxAggFunction
case INTEGER =>
new IntMaxAggFunction
case BIGINT =>
new LongMaxAggFunction
case FLOAT =>
new FloatMaxAggFunction
case DOUBLE =>
new DoubleMaxAggFunction
case DECIMAL =>
new DecimalMaxAggFunction
case BOOLEAN =>
new BooleanMaxAggFunction
case VARCHAR | CHAR =>
new StringMaxAggFunction
case TIMESTAMP =>
new TimestampMaxAggFunction
case DATE =>
new DateMaxAggFunction
case TIME =>
new TimeMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Max aggregate does no support type: '$sqlType'")
}
}
}
case unSupported: SqlAggFunction =>
throw new TableException(s"Unsupported Function: '${unSupported.getName}'")
}
}