def createDataSetWindowAggregationGroupReduceFunction()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala [634:786]


  def createDataSetWindowAggregationGroupReduceFunction(
      config: TableConfig,
      nullableInput: Boolean,
      inputTypeInfo: TypeInformation[_ <: Any],
      constants: Option[Seq[RexLiteral]],
      window: LogicalWindow,
      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
      physicalInputRowType: RelDataType,
      physicalInputTypes: Seq[TypeInformation[_]],
      outputType: RelDataType,
      groupings: Array[Int],
      properties: Seq[NamedWindowProperty],
      tableConfig: TableConfig,
      isInputCombined: Boolean = false)
    : RichGroupReduceFunction[Row, Row] = {

    val needRetract = false
    val aggregateMetadata = extractAggregateMetadata(
      namedAggregates.map(_.getKey),
      physicalInputRowType,
      physicalInputTypes.length,
      needRetract,
      tableConfig)

    val aggMapping = aggregateMetadata.getAdjustedMapping(groupings.length, partialResults = true)

    val generatorPre = new AggregationCodeGenerator(
      config,
      nullableInput,
      inputTypeInfo,
      constants,
      "GroupingWindowAggregateHelper",
      physicalInputTypes,
      aggregateMetadata.getAggregateFunctions,
      aggregateMetadata.getAggregateIndices,
      aggMapping,
      aggregateMetadata.getDistinctAccMapping,
      isStateBackedDataViews = false,
      partialResults = true,
      groupings.indices.toArray,
      Some(aggMapping),
      outputType.getFieldCount + aggregateMetadata.getDistinctAccCount,
      needRetract,
      needMerge = true,
      needReset = true,
      None
    )
    val genPreAggFunction = generatorPre.generateAggregations

    val generatorFinal = new AggregationCodeGenerator(
      config,
      nullableInput,
      inputTypeInfo,
      constants,
      "GroupingWindowAggregateHelper",
      physicalInputTypes,
      aggregateMetadata.getAggregateFunctions,
      aggregateMetadata.getAggregateIndices,
      aggMapping,
      aggregateMetadata.getDistinctAccMapping,
      isStateBackedDataViews = false,
      partialResults = false,
      groupings.indices.toArray,
      Some(aggMapping),
      outputType.getFieldCount,
      needRetract,
      needMerge = true,
      needReset = true,
      None
    )
    val genFinalAggFunction = generatorFinal.generateAggregations

    val keysAndAggregatesArity = groupings.length + namedAggregates.length

    window match {
      case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
        // tumbling time window
        val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
        if (doAllSupportPartialMerge(aggregateMetadata.getAggregateFunctions)) {
          // for incremental aggregations
          new DataSetTumbleTimeWindowAggReduceCombineFunction(
            genPreAggFunction,
            genFinalAggFunction,
            asLong(size),
            startPos,
            endPos,
            timePos,
            keysAndAggregatesArity)
        }
        else {
          // for non-incremental aggregations
          new DataSetTumbleTimeWindowAggReduceGroupFunction(
            genFinalAggFunction,
            asLong(size),
            startPos,
            endPos,
            timePos,
            outputType.getFieldCount)
        }
      case TumblingGroupWindow(_, _, size) =>
        // tumbling count window
        new DataSetTumbleCountWindowAggReduceGroupFunction(
          genFinalAggFunction,
          asLong(size))

      case SessionGroupWindow(_, _, gap) =>
        val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
        new DataSetSessionWindowAggReduceGroupFunction(
          genFinalAggFunction,
          keysAndAggregatesArity,
          startPos,
          endPos,
          timePos,
          asLong(gap),
          isInputCombined)

      case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
        val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
        if (doAllSupportPartialMerge(aggregateMetadata.getAggregateFunctions)) {
          // for partial aggregations
          new DataSetSlideWindowAggReduceCombineFunction(
            genPreAggFunction,
            genFinalAggFunction,
            keysAndAggregatesArity,
            startPos,
            endPos,
            timePos,
            asLong(size))
        }
        else {
          // for non-partial aggregations
          new DataSetSlideWindowAggReduceGroupFunction(
            genFinalAggFunction,
            keysAndAggregatesArity,
            startPos,
            endPos,
            timePos,
            asLong(size))
        }

      case SlidingGroupWindow(_, _, size, _) =>
        new DataSetSlideWindowAggReduceGroupFunction(
            genFinalAggFunction,
            keysAndAggregatesArity,
            None,
            None,
            None,
            asLong(size))

      case _ =>
        throw new UnsupportedOperationException(s"$window is currently not supported on batch")
    }
  }