in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [393:607]
def visit(
rel: StreamPhysicalRel,
requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match {
case sink: StreamExecSink =>
val childModifyKindSet = getModifyKindSet(sink.getInput)
val onlyAfter = onlyAfterOrNone(childModifyKindSet)
val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
val sinkTrait = UpdateKindTrait.fromChangelogMode(
sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
Seq(onlyAfter, beforeAndAfter)
} else if (sinkTrait.equals(BEFORE_AND_AFTER)){
Seq(beforeAndAfter)
} else {
Seq(UpdateKindTrait.NONE)
}
visitSink(sink, sinkRequiredTraits)
case sink: StreamExecLegacySink[_] =>
val childModifyKindSet = getModifyKindSet(sink.getInput)
val onlyAfter = onlyAfterOrNone(childModifyKindSet)
val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
val sinkRequiredTraits = sink.sink match {
case _: UpsertStreamTableSink[_] =>
// support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
Seq(onlyAfter, beforeAndAfter)
case _: RetractStreamTableSink[_] =>
Seq(beforeAndAfter)
case _: AppendStreamTableSink[_] | _: StreamTableSink[_] =>
Seq(UpdateKindTrait.NONE)
case ds: DataStreamTableSink[_] =>
if (ds.withChangeFlag) {
if (ds.needUpdateBefore) {
Seq(beforeAndAfter)
} else {
// support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
Seq(onlyAfter, beforeAndAfter)
}
} else {
Seq(UpdateKindTrait.NONE)
}
}
visitSink(sink, sinkRequiredTraits)
case _: StreamExecGroupAggregate | _: StreamExecGroupTableAggregate |
_: StreamExecLimit =>
// Aggregate, TableAggregate and Limit requires update_before if there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)
case _: StreamExecGroupWindowAggregate | _: StreamExecGroupWindowTableAggregate |
_: StreamExecDeduplicate | _: StreamExecTemporalSort | _: StreamExecMatch |
_: StreamExecOverAggregate | _: StreamExecIntervalJoin =>
// WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP, OverAggregate
// and IntervalJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
case rank: StreamExecRank =>
val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
rank, rank.partitionKey, rank.orderKey)
visitRankStrategies(rankStrategies, requiredTrait, rankStrategy => rank.copy(rankStrategy))
case sortLimit: StreamExecSortLimit =>
val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
sortLimit, ImmutableBitSet.of(), sortLimit.getCollation)
visitRankStrategies(
rankStrategies,
requiredTrait,
rankStrategy => sortLimit.copy(rankStrategy))
case sort: StreamExecSort =>
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(sort.getInput))
val children = visitChildren(sort, requiredChildTrait)
createNewNode(sort, children, requiredTrait)
case join: StreamExecJoin =>
val requiredUpdateBeforeByParent = requiredTrait.updateKind == UpdateKind.BEFORE_AND_AFTER
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val needUpdateBefore = !join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
val childRequiredTrait = if (needUpdateBefore || requiredUpdateBeforeByParent) {
beforeAfterOrNone(inputModifyKindSet)
} else {
onlyAfterOrNone(inputModifyKindSet)
}
this.visit(physicalChild, childRequiredTrait)
}
if (children.exists(_.isEmpty)) {
None
} else {
createNewNode(join, Some(children.flatten.toList), requiredTrait)
}
case temporalJoin: StreamExecTemporalJoin =>
// forward required mode to left input
val left = temporalJoin.getLeft.asInstanceOf[StreamPhysicalRel]
val right = temporalJoin.getRight.asInstanceOf[StreamPhysicalRel]
val newLeftOption = this.visit(left, requiredTrait)
// currently temporal join only support insert-only source as the right side
// so it requires nothing about UpdateKind
val newRightOption = this.visit(right, UpdateKindTrait.NONE)
(newLeftOption, newRightOption) match {
case (Some(newLeft), Some(newRight)) =>
val leftTrait = newLeft.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
createNewNode(temporalJoin, Some(List(newLeft, newRight)), leftTrait)
case _ =>
None
}
case calc: StreamExecCalcBase =>
if (requiredTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
calc.getProgram.getCondition != null) {
// we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind,
// to solve the bad case like a single 'cnt < 10' condition after aggregation.
// See FLINK-9528.
None
} else {
// otherwise, forward UpdateKind requirement
visitChildren(rel, requiredTrait) match {
case None => None
case Some(children) =>
val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
createNewNode(rel, Some(children), childTrait)
}
}
case _: StreamExecCorrelate | _: StreamExecPythonCorrelate | _: StreamExecLookupJoin |
_: StreamExecExchange | _: StreamExecExpand | _: StreamExecMiniBatchAssigner |
_: StreamExecWatermarkAssigner =>
// transparent forward requiredTrait to children
visitChildren(rel, requiredTrait) match {
case None => None
case Some(children) =>
val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
createNewNode(rel, Some(children), childTrait)
}
case union: StreamExecUnion =>
val children = union.getInputs.map {
case child: StreamPhysicalRel =>
val childModifyKindSet = getModifyKindSet(child)
val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
UpdateKindTrait.NONE
} else {
requiredTrait
}
this.visit(child, requiredChildTrait)
}.toList
if (children.exists(_.isEmpty)) {
None
} else {
val updateKinds = children.flatten
.map(_.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE))
// union can just forward changes, can't actively satisfy to another changelog mode
val providedTrait = if (updateKinds.forall(k => UpdateKindTrait.NONE == k)) {
// if all the children is NO_UPDATE, union is NO_UPDATE
UpdateKindTrait.NONE
} else {
// otherwise, merge update kinds.
val merged = updateKinds
.map(_.updateKind)
.reduce { (l, r) =>
(l, r) match {
case (UpdateKind.NONE, r: UpdateKind) => r
case (l: UpdateKind, UpdateKind.NONE) => l
case (l: UpdateKind, r: UpdateKind) if l == r => l
// UNION doesn't support to union ONLY_UPDATE_AFTER and BEFORE_AND_AFTER inputs
case (_, _) => return None
}
}
new UpdateKindTrait(merged)
}
createNewNode(union, Some(children.flatten), providedTrait)
}
case ts: StreamExecTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
if (providedTrait == UpdateKindTrait.ONLY_UPDATE_AFTER) {
throw new UnsupportedOperationException(
"Currently, ScanTableSource doesn't support producing ChangelogMode " +
"which contains UPDATE_AFTER but no UPDATE_BEFORE. Please update the " +
"implementation of '" + ts.tableSource.asSummaryString() + "' source.")
}
createNewNode(rel, Some(List()), providedTrait)
case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
_: StreamExecValues =>
createNewNode(rel, Some(List()), UpdateKindTrait.NONE)
case scan: StreamExecIntermediateTableScan =>
val providedTrait = if (scan.intermediateTable.isUpdateBeforeRequired) {
// we can't drop UPDATE_BEFORE if it is required by other parent blocks
UpdateKindTrait.BEFORE_AND_AFTER
} else {
requiredTrait
}
if (!providedTrait.satisfies(requiredTrait)) {
// require ONLY_AFTER but can only provide BEFORE_AND_AFTER
None
} else {
createNewNode(rel, Some(List()), providedTrait)
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
}