in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [112:290]
def visit(
rel: StreamPhysicalRel,
requiredTrait: ModifyKindSetTrait,
requester: String): StreamPhysicalRel = rel match {
case sink: StreamExecSink =>
val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'"
val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)
val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode(
sink.tableSink.getChangelogMode(queryModifyKindSet))
val children = visitChildren(sink, sinkRequiredTrait, name)
val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
// ignore required trait from context, because sink is the true root
sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]
case sink: StreamExecLegacySink[_] =>
val (sinkRequiredTrait, name) = sink.sink match {
case _: UpsertStreamTableSink[_] =>
(ModifyKindSetTrait.ALL_CHANGES, "UpsertStreamTableSink")
case _: RetractStreamTableSink[_] =>
(ModifyKindSetTrait.ALL_CHANGES, "RetractStreamTableSink")
case _: AppendStreamTableSink[_] =>
(ModifyKindSetTrait.INSERT_ONLY, "AppendStreamTableSink")
case _: StreamTableSink[_] =>
(ModifyKindSetTrait.INSERT_ONLY, "StreamTableSink")
case ds: DataStreamTableSink[_] =>
if (ds.withChangeFlag) {
(ModifyKindSetTrait.ALL_CHANGES, "toRetractStream")
} else {
(ModifyKindSetTrait.INSERT_ONLY, "toAppendStream")
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported sink '${sink.sink.getClass.getSimpleName}'")
}
val children = visitChildren(sink, sinkRequiredTrait, name)
val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
// ignore required trait from context, because sink is the true root
sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]
case deduplicate: StreamExecDeduplicate =>
// deduplicate only support insert only as input
val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = if (deduplicate.keepLastRow) {
// produce updates if it keeps last row
ModifyKindSetTrait.ALL_CHANGES
} else {
ModifyKindSetTrait.INSERT_ONLY
}
createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)
case agg: StreamExecGroupAggregate =>
// agg support all changes in input
val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
val inputModifyKindSet = getModifyKindSet(children.head)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
.addContainedKind(ModifyKind.UPDATE)
if (inputModifyKindSet.contains(ModifyKind.UPDATE) ||
inputModifyKindSet.contains(ModifyKind.DELETE)) {
builder.addContainedKind(ModifyKind.DELETE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(agg, children, providedTrait, requiredTrait, requester)
case tagg: StreamExecGroupTableAggregate =>
// table agg support all changes in input
val children = visitChildren(tagg, ModifyKindSetTrait.ALL_CHANGES)
// table aggregate will produce all changes, including deletions
createNewNode(
tagg, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)
case window: StreamExecGroupWindowAggregateBase =>
// WindowAggregate and WindowTableAggregate support insert-only in input
val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
if (window.emitStrategy.produceUpdates) {
builder.addContainedKind(ModifyKind.UPDATE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(window, children, providedTrait, requiredTrait, requester)
case limit: StreamExecLimit =>
// limit support all changes in input
val children = visitChildren(limit, ModifyKindSetTrait.ALL_CHANGES)
val providedTrait = if (getModifyKindSet(children.head).isInsertOnly) {
ModifyKindSetTrait.INSERT_ONLY
} else {
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(limit, children, providedTrait, requiredTrait, requester)
case _: StreamExecRank | _: StreamExecSortLimit =>
// Rank and SortLimit supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(
rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)
case sort: StreamExecSort =>
// Sort supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
// Sort will buffer all inputs, and produce insert-only messages when input is finished
createNewNode(
sort, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case cep: StreamExecMatch =>
// CEP only supports consuming insert-only and producing insert-only changes
// give a better requester name for exception message
val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
createNewNode(
cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case _: StreamExecTemporalSort | _: StreamExecOverAggregate | _: StreamExecIntervalJoin =>
// TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only
// and producing insert-only changes
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(
rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case join: StreamExecJoin =>
// join support all changes in input
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
val leftKindSet = getModifyKindSet(children.head)
val rightKindSet = getModifyKindSet(children.last)
val innerOrSemi = join.flinkJoinType == FlinkJoinType.INNER ||
join.flinkJoinType == FlinkJoinType.SEMI
val providedTrait = if (innerOrSemi) {
// forward left and right modify operations
new ModifyKindSetTrait(leftKindSet.union(rightKindSet))
} else {
// otherwise, it may produce any kinds of changes
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(join, children, providedTrait, requiredTrait, requester)
case temporalJoin: StreamExecTemporalJoin =>
// currently, temporal join only support insert-only input streams, including right side
val children = visitChildren(temporalJoin, ModifyKindSetTrait.INSERT_ONLY)
// forward left input changes
val leftTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
createNewNode(temporalJoin, children, leftTrait, requiredTrait, requester)
case _: StreamExecCalc | _: StreamExecPythonCalc | _: StreamExecCorrelate |
_: StreamExecPythonCorrelate | _: StreamExecLookupJoin | _: StreamExecExchange |
_: StreamExecExpand | _: StreamExecMiniBatchAssigner |
_: StreamExecWatermarkAssigner =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
// forward children mode
createNewNode(rel, children, childrenTrait, requiredTrait, requester)
case union: StreamExecUnion =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
// union provides all possible kinds of children have
val providedKindSet = ModifyKindSet.union(children.map(getModifyKindSet): _*)
createNewNode(
union, children, new ModifyKindSetTrait(providedKindSet), requiredTrait, requester)
case ts: StreamExecTableSourceScan =>
// ScanTableSource supports produces updates and deletions
val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
createNewNode(ts, List(), providedTrait, requiredTrait, requester)
case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
_: StreamExecValues =>
// DataStream, TableSource and Values only support producing insert-only messages
createNewNode(
rel, List(), ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case scan: StreamExecIntermediateTableScan =>
val providedTrait = new ModifyKindSetTrait(scan.intermediateTable.modifyKindSet)
createNewNode(scan, List(), providedTrait, requiredTrait, requester)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
}