in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala [144:345]
private def transformTimeSlidingWindow(
call: RelOptRuleCall,
input: RelNode,
agg: FlinkLogicalWindowAggregate,
window: LogicalWindow,
auxGroupSet: Array[Int],
aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
aggBufferTypes: Array[Array[LogicalType]],
preferHashExec: Boolean,
enableAssignPane: Boolean,
supportLocalAgg: Boolean): Unit = {
val groupSet = agg.getGroupSet.toArray
val aggregates = aggCallToAggFunction.map(_._2).toArray
// TODO aggregate include projection now, so do not provide new trait will be safe
val aggProvidedTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
val inputTimeFieldIndex = AggregateUtil.timeFieldIndex(
input.getRowType, call.builder(), window.timeAttribute)
val inputTimeFieldType = agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType
val inputTimeIsDate = inputTimeFieldType.getSqlTypeName == SqlTypeName.DATE
// local-agg output order: groupset | assignTs | aucGroupSet | aggCalls
val newInputTimeFieldIndexFromLocal = groupSet.length
val config = input.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
if (!isEnforceOnePhaseAgg(config) && supportLocalAgg) {
val windowType = if (inputTimeIsDate) new IntType() else new BigIntType()
// local
var localRequiredTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
// local win-agg output order: groupSet + assignTs + auxGroupSet + aggCalls
val localAggRelType = inferLocalWindowAggType(enableAssignPane, input.getRowType, agg,
groupSet, auxGroupSet, windowType, aggregates, aggBufferTypes)
val localAgg = if (preferHashExec) {
// hash
val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
val localProvidedTraitSet = localRequiredTraitSet
new BatchExecLocalHashWindowAggregate(
agg.getCluster,
call.builder(),
localProvidedTraitSet,
newLocalInput,
localAggRelType,
newLocalInput.getRowType,
groupSet,
auxGroupSet,
aggCallToAggFunction,
window,
inputTimeFieldIndex,
inputTimeIsDate,
agg.getNamedProperties,
enableAssignPane)
} else {
// sort
localRequiredTraitSet = localRequiredTraitSet
.replace(createRelCollation(groupSet :+ inputTimeFieldIndex))
val newLocalInput = RelOptRule.convert(input, localRequiredTraitSet)
val localProvidedTraitSet = localRequiredTraitSet
new BatchExecLocalSortWindowAggregate(
agg.getCluster,
call.builder(),
localProvidedTraitSet,
newLocalInput,
localAggRelType,
newLocalInput.getRowType,
groupSet,
auxGroupSet,
aggCallToAggFunction,
window,
inputTimeFieldIndex,
inputTimeIsDate,
agg.getNamedProperties,
enableAssignPane)
}
// global
var globalRequiredTraitSet = localAgg.getTraitSet
// distribute by grouping keys or single plus assigned pane
val distributionFields = if (agg.getGroupCount != 0) {
// global agg should use groupSet's indices as distribution fields
val globalGroupSet = groupSet.indices
FlinkRelDistribution.hash(globalGroupSet.map(Integer.valueOf), requireStrict = false)
} else {
FlinkRelDistribution.SINGLETON
}
globalRequiredTraitSet = globalRequiredTraitSet.replace(distributionFields)
val globalAgg = if (preferHashExec) {
// hash
val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet)
new BatchExecHashWindowAggregate(
agg.getCluster,
call.builder(),
aggProvidedTraitSet,
newGlobalAggInput,
agg.getRowType,
newGlobalAggInput.getRowType,
input.getRowType,
groupSet.indices.toArray,
// auxGroupSet starts from `size of groupSet + 1(assignTs)`
(groupSet.length + 1 until groupSet.length + 1 + auxGroupSet.length).toArray,
aggCallToAggFunction,
window,
newInputTimeFieldIndexFromLocal,
inputTimeIsDate,
agg.getNamedProperties,
enableAssignPane,
isMerge = true)
} else {
// sort
globalRequiredTraitSet = globalRequiredTraitSet
.replace(RelCollations.EMPTY)
.replace(createRelCollation(groupSet.indices.toArray :+ groupSet.length))
val newGlobalAggInput = RelOptRule.convert(localAgg, globalRequiredTraitSet)
new BatchExecSortWindowAggregate(
agg.getCluster,
call.builder(),
aggProvidedTraitSet,
newGlobalAggInput,
agg.getRowType,
newGlobalAggInput.getRowType,
input.getRowType,
groupSet.indices.toArray,
// auxGroupSet starts from `size of groupSet + 1(assignTs)`
(groupSet.length + 1 until groupSet.length + 1 + auxGroupSet.length).toArray,
aggCallToAggFunction,
window,
newInputTimeFieldIndexFromLocal,
inputTimeIsDate,
agg.getNamedProperties,
enableAssignPane,
isMerge = true)
}
call.transformTo(globalAgg)
}
// disable one-phase agg if prefer two-phase agg
if (!isEnforceTwoPhaseAgg(config) || !supportLocalAgg) {
var requiredTraitSet = agg.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
// distribute by grouping keys
requiredTraitSet = if (agg.getGroupCount != 0) {
requiredTraitSet.replace(
FlinkRelDistribution.hash(groupSet.map(Integer.valueOf).toList, requireStrict = false))
} else {
requiredTraitSet.replace(FlinkRelDistribution.SINGLETON)
}
val windowAgg = if (preferHashExec && !enableAssignPane) {
// case 1: Tumbling window, Sliding window windowSize >= slideSize
// case 2: Sliding window without pane optimization
val newInput = RelOptRule.convert(input, requiredTraitSet)
new BatchExecHashWindowAggregate(
agg.getCluster,
call.builder(),
aggProvidedTraitSet,
newInput,
agg.getRowType,
newInput.getRowType,
newInput.getRowType,
groupSet,
auxGroupSet,
aggCallToAggFunction,
window,
inputTimeFieldIndex,
inputTimeIsDate,
agg.getNamedProperties,
enableAssignPane,
isMerge = false)
} else {
// sort by grouping keys and time field in ascending direction
requiredTraitSet = requiredTraitSet.replace(createRelCollation(
groupSet :+ inputTimeFieldIndex))
val newInput = RelOptRule.convert(input, requiredTraitSet)
new BatchExecSortWindowAggregate(
agg.getCluster,
call.builder(),
aggProvidedTraitSet,
newInput,
agg.getRowType,
newInput.getRowType,
newInput.getRowType,
groupSet,
auxGroupSet,
aggCallToAggFunction,
window,
inputTimeFieldIndex,
inputTimeIsDate,
agg.getNamedProperties,
enableAssignPane,
isMerge = false)
}
call.transformTo(windowAgg)
}
}