private def transformTimeSlidingWindow()

in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala [144:345]


  private def transformTimeSlidingWindow(
      call: RelOptRuleCall,
      input: RelNode,
      agg: FlinkLogicalWindowAggregate,
      window: LogicalWindow,
      auxGroupSet: Array[Int],
      aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
      aggBufferTypes: Array[Array[LogicalType]],
      preferHashExec: Boolean,
      enableAssignPane: Boolean,
      supportLocalAgg: Boolean): Unit = {
    val groupSet = agg.getGroupSet.toArray
    val aggregates = aggCallToAggFunction.map(_._2).toArray

    // TODO aggregate include projection now, so do not provide new trait will be safe
    val aggProvidedTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)

    val inputTimeFieldIndex = AggregateUtil.timeFieldIndex(
      input.getRowType, call.builder(), window.timeAttribute)
    val inputTimeFieldType = agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType
    val inputTimeIsDate = inputTimeFieldType.getSqlTypeName == SqlTypeName.DATE
    // local-agg output order: groupset | assignTs | aucGroupSet | aggCalls
    val newInputTimeFieldIndexFromLocal = groupSet.length

    val config = input.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
    if (!isEnforceOnePhaseAgg(config) && supportLocalAgg) {
      val windowType = if (inputTimeIsDate) new IntType() else new BigIntType()
      // local
      var localRequiredTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
      // local win-agg output order: groupSet + assignTs + auxGroupSet + aggCalls
      val localAggRelType = inferLocalWindowAggType(enableAssignPane, input.getRowType, agg,
        groupSet, auxGroupSet, windowType, aggregates, aggBufferTypes)

      val localAgg = if (preferHashExec) {
        // hash
        val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
        val localProvidedTraitSet = localRequiredTraitSet

        new BatchExecLocalHashWindowAggregate(
          agg.getCluster,
          call.builder(),
          localProvidedTraitSet,
          newLocalInput,
          localAggRelType,
          newLocalInput.getRowType,
          groupSet,
          auxGroupSet,
          aggCallToAggFunction,
          window,
          inputTimeFieldIndex,
          inputTimeIsDate,
          agg.getNamedProperties,
          enableAssignPane)
      } else {
        // sort
        localRequiredTraitSet = localRequiredTraitSet
          .replace(createRelCollation(groupSet :+ inputTimeFieldIndex))

        val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
        val localProvidedTraitSet = localRequiredTraitSet

        new BatchExecLocalSortWindowAggregate(
          agg.getCluster,
          call.builder(),
          localProvidedTraitSet,
          newLocalInput,
          localAggRelType,
          newLocalInput.getRowType,
          groupSet,
          auxGroupSet,
          aggCallToAggFunction,
          window,
          inputTimeFieldIndex,
          inputTimeIsDate,
          agg.getNamedProperties,
          enableAssignPane)
      }

      // global
      var globalRequiredTraitSet = localAgg.getTraitSet
      // distribute by grouping keys or single plus assigned pane
      val distributionFields = if (agg.getGroupCount != 0) {
        // global agg should use groupSet's indices as distribution fields
        val globalGroupSet = groupSet.indices
        FlinkRelDistribution.hash(globalGroupSet.map(Integer.valueOf), requireStrict = false)
      } else {
        FlinkRelDistribution.SINGLETON
      }
      globalRequiredTraitSet = globalRequiredTraitSet.replace(distributionFields)

      val globalAgg = if (preferHashExec) {
        // hash
        val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet)

        new BatchExecHashWindowAggregate(
          agg.getCluster,
          call.builder(),
          aggProvidedTraitSet,
          newGlobalAggInput,
          agg.getRowType,
          newGlobalAggInput.getRowType,
          input.getRowType,
          groupSet.indices.toArray,
          // auxGroupSet starts from `size of groupSet + 1(assignTs)`
          (groupSet.length + 1 until groupSet.length + 1 + auxGroupSet.length).toArray,
          aggCallToAggFunction,
          window,
          newInputTimeFieldIndexFromLocal,
          inputTimeIsDate,
          agg.getNamedProperties,
          enableAssignPane,
          isMerge = true)
      } else {
        // sort
        globalRequiredTraitSet = globalRequiredTraitSet
          .replace(RelCollations.EMPTY)
          .replace(createRelCollation(groupSet.indices.toArray :+ groupSet.length))
        val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet)

        new BatchExecSortWindowAggregate(
          agg.getCluster,
          call.builder(),
          aggProvidedTraitSet,
          newGlobalAggInput,
          agg.getRowType,
          newGlobalAggInput.getRowType,
          input.getRowType,
          groupSet.indices.toArray,
          // auxGroupSet starts from `size of groupSet + 1(assignTs)`
          (groupSet.length + 1 until groupSet.length + 1 + auxGroupSet.length).toArray,
          aggCallToAggFunction,
          window,
          newInputTimeFieldIndexFromLocal,
          inputTimeIsDate,
          agg.getNamedProperties,
          enableAssignPane,
          isMerge = true)
      }

      call.transformTo(globalAgg)
    }
    // disable one-phase agg if prefer two-phase agg
    if (!isEnforceTwoPhaseAgg(config) || !supportLocalAgg) {
      var requiredTraitSet = agg.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
      // distribute by grouping keys
      requiredTraitSet = if (agg.getGroupCount != 0) {
        requiredTraitSet.replace(
          FlinkRelDistribution.hash(groupSet.map(Integer.valueOf).toList, requireStrict = false))
      } else {
        requiredTraitSet.replace(FlinkRelDistribution.SINGLETON)
      }

      val windowAgg = if (preferHashExec && !enableAssignPane) {
        // case 1: Tumbling window, Sliding window windowSize >= slideSize
        // case 2: Sliding window without pane optimization
        val newInput = RelOptRule.convert(input, requiredTraitSet)

        new BatchExecHashWindowAggregate(
          agg.getCluster,
          call.builder(),
          aggProvidedTraitSet,
          newInput,
          agg.getRowType,
          newInput.getRowType,
          newInput.getRowType,
          groupSet,
          auxGroupSet,
          aggCallToAggFunction,
          window,
          inputTimeFieldIndex,
          inputTimeIsDate,
          agg.getNamedProperties,
          enableAssignPane,
          isMerge = false)
      } else {
        // sort by grouping keys and time field in ascending direction
        requiredTraitSet = requiredTraitSet.replace(createRelCollation(
          groupSet :+ inputTimeFieldIndex))
        val newInput = RelOptRule.convert(input, requiredTraitSet)

        new BatchExecSortWindowAggregate(
          agg.getCluster,
          call.builder(),
          aggProvidedTraitSet,
          newInput,
          agg.getRowType,
          newInput.getRowType,
          newInput.getRowType,
          groupSet,
          auxGroupSet,
          aggCallToAggFunction,
          window,
          inputTimeFieldIndex,
          inputTimeIsDate,
          agg.getNamedProperties,
          enableAssignPane,
          isMerge = false)
      }

      call.transformTo(windowAgg)
    }
  }