in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala [634:786]
def createDataSetWindowAggregationGroupReduceFunction(
config: TableConfig,
nullableInput: Boolean,
inputTypeInfo: TypeInformation[_ <: Any],
constants: Option[Seq[RexLiteral]],
window: LogicalWindow,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
physicalInputRowType: RelDataType,
physicalInputTypes: Seq[TypeInformation[_]],
outputType: RelDataType,
groupings: Array[Int],
properties: Seq[NamedWindowProperty],
tableConfig: TableConfig,
isInputCombined: Boolean = false)
: RichGroupReduceFunction[Row, Row] = {
val needRetract = false
val aggregateMetadata = extractAggregateMetadata(
namedAggregates.map(_.getKey),
physicalInputRowType,
physicalInputTypes.length,
needRetract,
tableConfig)
val aggMapping = aggregateMetadata.getAdjustedMapping(groupings.length, partialResults = true)
val generatorPre = new AggregationCodeGenerator(
config,
nullableInput,
inputTypeInfo,
constants,
"GroupingWindowAggregateHelper",
physicalInputTypes,
aggregateMetadata.getAggregateFunctions,
aggregateMetadata.getAggregateIndices,
aggMapping,
aggregateMetadata.getDistinctAccMapping,
isStateBackedDataViews = false,
partialResults = true,
groupings.indices.toArray,
Some(aggMapping),
outputType.getFieldCount + aggregateMetadata.getDistinctAccCount,
needRetract,
needMerge = true,
needReset = true,
None
)
val genPreAggFunction = generatorPre.generateAggregations
val generatorFinal = new AggregationCodeGenerator(
config,
nullableInput,
inputTypeInfo,
constants,
"GroupingWindowAggregateHelper",
physicalInputTypes,
aggregateMetadata.getAggregateFunctions,
aggregateMetadata.getAggregateIndices,
aggMapping,
aggregateMetadata.getDistinctAccMapping,
isStateBackedDataViews = false,
partialResults = false,
groupings.indices.toArray,
Some(aggMapping),
outputType.getFieldCount,
needRetract,
needMerge = true,
needReset = true,
None
)
val genFinalAggFunction = generatorFinal.generateAggregations
val keysAndAggregatesArity = groupings.length + namedAggregates.length
window match {
case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
// tumbling time window
val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
if (doAllSupportPartialMerge(aggregateMetadata.getAggregateFunctions)) {
// for incremental aggregations
new DataSetTumbleTimeWindowAggReduceCombineFunction(
genPreAggFunction,
genFinalAggFunction,
asLong(size),
startPos,
endPos,
timePos,
keysAndAggregatesArity)
}
else {
// for non-incremental aggregations
new DataSetTumbleTimeWindowAggReduceGroupFunction(
genFinalAggFunction,
asLong(size),
startPos,
endPos,
timePos,
outputType.getFieldCount)
}
case TumblingGroupWindow(_, _, size) =>
// tumbling count window
new DataSetTumbleCountWindowAggReduceGroupFunction(
genFinalAggFunction,
asLong(size))
case SessionGroupWindow(_, _, gap) =>
val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
new DataSetSessionWindowAggReduceGroupFunction(
genFinalAggFunction,
keysAndAggregatesArity,
startPos,
endPos,
timePos,
asLong(gap),
isInputCombined)
case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
if (doAllSupportPartialMerge(aggregateMetadata.getAggregateFunctions)) {
// for partial aggregations
new DataSetSlideWindowAggReduceCombineFunction(
genPreAggFunction,
genFinalAggFunction,
keysAndAggregatesArity,
startPos,
endPos,
timePos,
asLong(size))
}
else {
// for non-partial aggregations
new DataSetSlideWindowAggReduceGroupFunction(
genFinalAggFunction,
keysAndAggregatesArity,
startPos,
endPos,
timePos,
asLong(size))
}
case SlidingGroupWindow(_, _, size, _) =>
new DataSetSlideWindowAggReduceGroupFunction(
genFinalAggFunction,
keysAndAggregatesArity,
None,
None,
None,
asLong(size))
case _ =>
throw new UnsupportedOperationException(s"$window is currently not supported on batch")
}
}