def visit()

in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [393:607]


    def visit(
        rel: StreamPhysicalRel,
        requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match {
      case sink: StreamExecSink =>
        val childModifyKindSet = getModifyKindSet(sink.getInput)
        val onlyAfter = onlyAfterOrNone(childModifyKindSet)
        val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
        val sinkTrait = UpdateKindTrait.fromChangelogMode(
          sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
        val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
          Seq(onlyAfter, beforeAndAfter)
        } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
          Seq(beforeAndAfter)
        } else {
          Seq(UpdateKindTrait.NONE)
        }
        visitSink(sink, sinkRequiredTraits)

      case sink: StreamExecLegacySink[_] =>
        val childModifyKindSet = getModifyKindSet(sink.getInput)
        val onlyAfter = onlyAfterOrNone(childModifyKindSet)
        val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
        val sinkRequiredTraits = sink.sink match {
          case _: UpsertStreamTableSink[_] =>
            // support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
            Seq(onlyAfter, beforeAndAfter)
          case _: RetractStreamTableSink[_] =>
            Seq(beforeAndAfter)
          case _: AppendStreamTableSink[_] | _: StreamTableSink[_] =>
            Seq(UpdateKindTrait.NONE)
          case ds: DataStreamTableSink[_] =>
            if (ds.withChangeFlag) {
              if (ds.needUpdateBefore) {
                Seq(beforeAndAfter)
              } else {
                // support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
                Seq(onlyAfter, beforeAndAfter)
              }
            } else {
              Seq(UpdateKindTrait.NONE)
            }
        }
        visitSink(sink, sinkRequiredTraits)

      case _: StreamExecGroupAggregate | _: StreamExecGroupTableAggregate |
           _: StreamExecLimit =>
        // Aggregate, TableAggregate and Limit requires update_before if there are updates
        val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
        val children = visitChildren(rel, requiredChildTrait)
        // use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
        createNewNode(rel, children, requiredTrait)

      case _: StreamExecGroupWindowAggregate | _: StreamExecGroupWindowTableAggregate |
           _: StreamExecDeduplicate | _: StreamExecTemporalSort | _: StreamExecMatch |
           _: StreamExecOverAggregate | _: StreamExecIntervalJoin =>
        // WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP, OverAggregate
        // and IntervalJoin require nothing about UpdateKind.
        val children = visitChildren(rel, UpdateKindTrait.NONE)
        createNewNode(rel, children, requiredTrait)

      case rank: StreamExecRank =>
        val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
          rank, rank.partitionKey, rank.orderKey)
        visitRankStrategies(rankStrategies, requiredTrait, rankStrategy => rank.copy(rankStrategy))

      case sortLimit: StreamExecSortLimit =>
        val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
          sortLimit, ImmutableBitSet.of(), sortLimit.getCollation)
        visitRankStrategies(
          rankStrategies,
          requiredTrait,
          rankStrategy => sortLimit.copy(rankStrategy))

      case sort: StreamExecSort =>
        val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(sort.getInput))
        val children = visitChildren(sort, requiredChildTrait)
        createNewNode(sort, children, requiredTrait)

      case join: StreamExecJoin =>
        val requiredUpdateBeforeByParent = requiredTrait.updateKind == UpdateKind.BEFORE_AND_AFTER
        val children = join.getInputs.zipWithIndex.map {
          case (child, childOrdinal) =>
            val physicalChild = child.asInstanceOf[StreamPhysicalRel]
            val needUpdateBefore = !join.inputUniqueKeyContainsJoinKey(childOrdinal)
            val inputModifyKindSet = getModifyKindSet(physicalChild)
            val childRequiredTrait = if (needUpdateBefore || requiredUpdateBeforeByParent) {
              beforeAfterOrNone(inputModifyKindSet)
            } else {
              onlyAfterOrNone(inputModifyKindSet)
            }
            this.visit(physicalChild, childRequiredTrait)
        }
        if (children.exists(_.isEmpty)) {
          None
        } else {
          createNewNode(join, Some(children.flatten.toList), requiredTrait)
        }

      case temporalJoin: StreamExecTemporalJoin =>
        // forward required mode to left input
        val left = temporalJoin.getLeft.asInstanceOf[StreamPhysicalRel]
        val right = temporalJoin.getRight.asInstanceOf[StreamPhysicalRel]
        val newLeftOption = this.visit(left, requiredTrait)
        // currently temporal join only support insert-only source as the right side
        // so it requires nothing about UpdateKind
        val newRightOption = this.visit(right, UpdateKindTrait.NONE)
        (newLeftOption, newRightOption) match {
          case (Some(newLeft), Some(newRight)) =>
            val leftTrait = newLeft.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
            createNewNode(temporalJoin, Some(List(newLeft, newRight)), leftTrait)
          case _ =>
            None
        }

      case calc: StreamExecCalcBase =>
        if (requiredTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
            calc.getProgram.getCondition != null) {
          // we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind,
          // to solve the bad case like a single 'cnt < 10' condition after aggregation.
          // See FLINK-9528.
          None
        } else {
          // otherwise, forward UpdateKind requirement
          visitChildren(rel, requiredTrait) match {
            case None => None
            case Some(children) =>
              val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
              createNewNode(rel, Some(children), childTrait)
          }
        }

      case _: StreamExecCorrelate | _: StreamExecPythonCorrelate | _: StreamExecLookupJoin |
           _: StreamExecExchange | _: StreamExecExpand | _: StreamExecMiniBatchAssigner |
           _: StreamExecWatermarkAssigner =>
        // transparent forward requiredTrait to children
        visitChildren(rel, requiredTrait) match {
          case None => None
          case Some(children) =>
            val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
            createNewNode(rel, Some(children), childTrait)
        }

      case union: StreamExecUnion =>
        val children = union.getInputs.map {
          case child: StreamPhysicalRel =>
            val childModifyKindSet = getModifyKindSet(child)
            val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
              UpdateKindTrait.NONE
            } else {
              requiredTrait
            }
            this.visit(child, requiredChildTrait)
        }.toList

        if (children.exists(_.isEmpty)) {
          None
        } else {
          val updateKinds = children.flatten
            .map(_.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE))
          // union can just forward changes, can't actively satisfy to another changelog mode
          val providedTrait = if (updateKinds.forall(k => UpdateKindTrait.NONE == k)) {
            // if all the children is NO_UPDATE, union is NO_UPDATE
            UpdateKindTrait.NONE
          } else {
            // otherwise, merge update kinds.
            val merged = updateKinds
              .map(_.updateKind)
              .reduce { (l, r) =>
                (l, r) match {
                  case (UpdateKind.NONE, r: UpdateKind) => r
                  case (l: UpdateKind, UpdateKind.NONE) => l
                  case (l: UpdateKind, r: UpdateKind) if l == r => l
                  // UNION doesn't support to union ONLY_UPDATE_AFTER and BEFORE_AND_AFTER inputs
                  case (_, _) => return None
                }
              }
            new UpdateKindTrait(merged)
          }
          createNewNode(union, Some(children.flatten), providedTrait)
        }

      case ts: StreamExecTableSourceScan =>
        // currently only support BEFORE_AND_AFTER if source produces updates
        val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
        if (providedTrait == UpdateKindTrait.ONLY_UPDATE_AFTER) {
          throw new UnsupportedOperationException(
            "Currently, ScanTableSource doesn't support producing ChangelogMode " +
              "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please update the " +
              "implementation of '" + ts.tableSource.asSummaryString() + "' source.")
        }
        createNewNode(rel, Some(List()), providedTrait)

      case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
           _: StreamExecValues =>
        createNewNode(rel, Some(List()), UpdateKindTrait.NONE)

      case scan: StreamExecIntermediateTableScan =>
        val providedTrait = if (scan.intermediateTable.isUpdateBeforeRequired) {
          // we can't drop UPDATE_BEFORE if it is required by other parent blocks
          UpdateKindTrait.BEFORE_AND_AFTER
        } else {
          requiredTrait
        }
        if (!providedTrait.satisfies(requiredTrait)) {
          // require ONLY_AFTER but can only provide BEFORE_AND_AFTER
          None
        } else {
          createNewNode(rel, Some(List()), providedTrait)
        }

      case _ =>
        throw new UnsupportedOperationException(
          s"Unsupported visit for ${rel.getClass.getSimpleName}")

    }