def translateToPlanInternal()

in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala [259:439]


  def translateToPlanInternal(
      inputTransformation: Transformation[RowData],
      env: StreamExecutionEnvironment,
      config: TableConfig,
      relBuilder: RelBuilder): Transformation[RowData] = {

    val inputRowType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
    val tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType)
    val resultRowType = FlinkTypeFactory.toLogicalRowType(getRowType)

    val producedTypeInfo = fromDataTypeToTypeInfo(getLookupFunctionProducedType)

    // validate whether the node is valid and supported.
    validate(
      inputRowType,
      tableSourceRowType,
      allLookupKeys,
      joinType)

    val lookupFieldTypesInOrder = lookupKeyIndicesInOrder
      .map(temporalTableSchema.getFieldDataTypes()(_)).map(fromDataTypeToLogicalType)

    val leftOuterJoin = joinType == JoinRelType.LEFT

    val operatorFactory = if (isAsyncEnabled) {
      val asyncBufferCapacity= config.getConfiguration
        .getInteger(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY)
      val asyncTimeout = getMillisecondFromConfigDuration(config,
        ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)

      val asyncLookupFunction = lookupFunction.asInstanceOf[AsyncTableFunction[_]]
      // return type valid check
      val udtfResultType = asyncLookupFunction.getResultType
      val extractedResultTypeInfo = TypeExtractor.createTypeInfo(
        asyncLookupFunction,
        classOf[AsyncTableFunction[_]],
        asyncLookupFunction.getClass,
        0)
      checkUdtfReturnType(
        udtfResultType,
        extractedResultTypeInfo)
      val futureType = new TypeInformationRawType(
        new GenericTypeInfo(classOf[CompletableFuture[_]]))
      val parameters = Array(futureType) ++ lookupFieldTypesInOrder
      checkEvalMethodSignature(
        asyncLookupFunction,
        parameters,
        extractedResultTypeInfo)

      val generatedFetcher = LookupJoinCodeGenerator.generateAsyncLookupFunction(
        config,
        relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory],
        inputRowType,
        resultRowType,
        producedTypeInfo,
        lookupKeyIndicesInOrder,
        allLookupKeys,
        asyncLookupFunction)

      val asyncFunc = if (calcOnTemporalTable.isDefined) {
        // a projection or filter after table source scan
        val rightRowType = FlinkTypeFactory
          .toLogicalRowType(calcOnTemporalTable.get.getOutputRowType)
        val generatedResultFuture = LookupJoinCodeGenerator.generateTableAsyncCollector(
          config,
          "TableFunctionResultFuture",
          inputRowType,
          rightRowType,
          remainingCondition)
        val generatedCalc = generateCalcMapFunction(
          config,
          calcOnTemporalTable,
          tableSourceRowType)

        new AsyncLookupJoinWithCalcRunner(
          generatedFetcher,
          generatedCalc,
          generatedResultFuture,
          producedTypeInfo,
          InternalTypeInfo.of(rightRowType),
          leftOuterJoin,
          asyncBufferCapacity)
      } else {
        // right type is the same as table source row type, because no calc after temporal table
        val rightRowType = tableSourceRowType
        val generatedResultFuture = LookupJoinCodeGenerator.generateTableAsyncCollector(
          config,
          "TableFunctionResultFuture",
          inputRowType,
          rightRowType,
          remainingCondition)
        new AsyncLookupJoinRunner(
          generatedFetcher,
          generatedResultFuture,
          producedTypeInfo,
          InternalTypeInfo.of(rightRowType),
          leftOuterJoin,
          asyncBufferCapacity)
      }

      // force ORDERED output mode currently, optimize it to UNORDERED
      // when the downstream do not need orderness
      new AsyncWaitOperatorFactory(asyncFunc, asyncTimeout, asyncBufferCapacity, OutputMode.ORDERED)
    } else {
      // sync join
      val syncLookupFunction = lookupFunction.asInstanceOf[TableFunction[_]]
      // return type valid check
      val udtfResultType = syncLookupFunction.getResultType
      val extractedResultTypeInfo = TypeExtractor.createTypeInfo(
        syncLookupFunction,
        classOf[TableFunction[_]],
        syncLookupFunction.getClass,
        0)
      checkUdtfReturnType(
        udtfResultType,
        extractedResultTypeInfo)
      checkEvalMethodSignature(
        syncLookupFunction,
        lookupFieldTypesInOrder,
        extractedResultTypeInfo)

      val generatedFetcher = LookupJoinCodeGenerator.generateLookupFunction(
        config,
        relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory],
        inputRowType,
        resultRowType,
        producedTypeInfo,
        lookupKeyIndicesInOrder,
        allLookupKeys,
        syncLookupFunction,
        env.getConfig.isObjectReuseEnabled)

      val ctx = CodeGeneratorContext(config)
      val processFunc = if (calcOnTemporalTable.isDefined) {
        // a projection or filter after table source scan
        val rightRowType = FlinkTypeFactory
          .toLogicalRowType(calcOnTemporalTable.get.getOutputRowType)
        val generatedCollector = generateCollector(
          ctx,
          inputRowType,
          rightRowType,
          resultRowType,
          remainingCondition,
          None)
        val generatedCalc = generateCalcMapFunction(
          config,
          calcOnTemporalTable,
          tableSourceRowType)

        new LookupJoinWithCalcRunner(
          generatedFetcher,
          generatedCalc,
          generatedCollector,
          leftOuterJoin,
          rightRowType.getFieldCount)
      } else {
        // right type is the same as table source row type, because no calc after temporal table
        val rightRowType = tableSourceRowType
        val generatedCollector = generateCollector(
          ctx,
          inputRowType,
          rightRowType,
          resultRowType,
          remainingCondition,
          None)
        new LookupJoinRunner(
          generatedFetcher,
          generatedCollector,
          leftOuterJoin,
          rightRowType.getFieldCount)
      }
      SimpleOperatorFactory.of(new ProcessOperator(processFunc))
    }

    ExecNode.createOneInputTransformation(
      inputTransformation,
      getRelDetailedDescription,
      operatorFactory,
      InternalTypeInfo.of(resultRowType),
      inputTransformation.getParallelism)
  }