in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala [259:439]
def translateToPlanInternal(
inputTransformation: Transformation[RowData],
env: StreamExecutionEnvironment,
config: TableConfig,
relBuilder: RelBuilder): Transformation[RowData] = {
val inputRowType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
val tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType)
val resultRowType = FlinkTypeFactory.toLogicalRowType(getRowType)
val producedTypeInfo = fromDataTypeToTypeInfo(getLookupFunctionProducedType)
// validate whether the node is valid and supported.
validate(
inputRowType,
tableSourceRowType,
allLookupKeys,
joinType)
val lookupFieldTypesInOrder = lookupKeyIndicesInOrder
.map(temporalTableSchema.getFieldDataTypes()(_)).map(fromDataTypeToLogicalType)
val leftOuterJoin = joinType == JoinRelType.LEFT
val operatorFactory = if (isAsyncEnabled) {
val asyncBufferCapacity= config.getConfiguration
.getInteger(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY)
val asyncTimeout = getMillisecondFromConfigDuration(config,
ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)
val asyncLookupFunction = lookupFunction.asInstanceOf[AsyncTableFunction[_]]
// return type valid check
val udtfResultType = asyncLookupFunction.getResultType
val extractedResultTypeInfo = TypeExtractor.createTypeInfo(
asyncLookupFunction,
classOf[AsyncTableFunction[_]],
asyncLookupFunction.getClass,
0)
checkUdtfReturnType(
udtfResultType,
extractedResultTypeInfo)
val futureType = new TypeInformationRawType(
new GenericTypeInfo(classOf[CompletableFuture[_]]))
val parameters = Array(futureType) ++ lookupFieldTypesInOrder
checkEvalMethodSignature(
asyncLookupFunction,
parameters,
extractedResultTypeInfo)
val generatedFetcher = LookupJoinCodeGenerator.generateAsyncLookupFunction(
config,
relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory],
inputRowType,
resultRowType,
producedTypeInfo,
lookupKeyIndicesInOrder,
allLookupKeys,
asyncLookupFunction)
val asyncFunc = if (calcOnTemporalTable.isDefined) {
// a projection or filter after table source scan
val rightRowType = FlinkTypeFactory
.toLogicalRowType(calcOnTemporalTable.get.getOutputRowType)
val generatedResultFuture = LookupJoinCodeGenerator.generateTableAsyncCollector(
config,
"TableFunctionResultFuture",
inputRowType,
rightRowType,
remainingCondition)
val generatedCalc = generateCalcMapFunction(
config,
calcOnTemporalTable,
tableSourceRowType)
new AsyncLookupJoinWithCalcRunner(
generatedFetcher,
generatedCalc,
generatedResultFuture,
producedTypeInfo,
InternalTypeInfo.of(rightRowType),
leftOuterJoin,
asyncBufferCapacity)
} else {
// right type is the same as table source row type, because no calc after temporal table
val rightRowType = tableSourceRowType
val generatedResultFuture = LookupJoinCodeGenerator.generateTableAsyncCollector(
config,
"TableFunctionResultFuture",
inputRowType,
rightRowType,
remainingCondition)
new AsyncLookupJoinRunner(
generatedFetcher,
generatedResultFuture,
producedTypeInfo,
InternalTypeInfo.of(rightRowType),
leftOuterJoin,
asyncBufferCapacity)
}
// force ORDERED output mode currently, optimize it to UNORDERED
// when the downstream do not need orderness
new AsyncWaitOperatorFactory(asyncFunc, asyncTimeout, asyncBufferCapacity, OutputMode.ORDERED)
} else {
// sync join
val syncLookupFunction = lookupFunction.asInstanceOf[TableFunction[_]]
// return type valid check
val udtfResultType = syncLookupFunction.getResultType
val extractedResultTypeInfo = TypeExtractor.createTypeInfo(
syncLookupFunction,
classOf[TableFunction[_]],
syncLookupFunction.getClass,
0)
checkUdtfReturnType(
udtfResultType,
extractedResultTypeInfo)
checkEvalMethodSignature(
syncLookupFunction,
lookupFieldTypesInOrder,
extractedResultTypeInfo)
val generatedFetcher = LookupJoinCodeGenerator.generateLookupFunction(
config,
relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory],
inputRowType,
resultRowType,
producedTypeInfo,
lookupKeyIndicesInOrder,
allLookupKeys,
syncLookupFunction,
env.getConfig.isObjectReuseEnabled)
val ctx = CodeGeneratorContext(config)
val processFunc = if (calcOnTemporalTable.isDefined) {
// a projection or filter after table source scan
val rightRowType = FlinkTypeFactory
.toLogicalRowType(calcOnTemporalTable.get.getOutputRowType)
val generatedCollector = generateCollector(
ctx,
inputRowType,
rightRowType,
resultRowType,
remainingCondition,
None)
val generatedCalc = generateCalcMapFunction(
config,
calcOnTemporalTable,
tableSourceRowType)
new LookupJoinWithCalcRunner(
generatedFetcher,
generatedCalc,
generatedCollector,
leftOuterJoin,
rightRowType.getFieldCount)
} else {
// right type is the same as table source row type, because no calc after temporal table
val rightRowType = tableSourceRowType
val generatedCollector = generateCollector(
ctx,
inputRowType,
rightRowType,
resultRowType,
remainingCondition,
None)
new LookupJoinRunner(
generatedFetcher,
generatedCollector,
leftOuterJoin,
rightRowType.getFieldCount)
}
SimpleOperatorFactory.of(new ProcessOperator(processFunc))
}
ExecNode.createOneInputTransformation(
inputTransformation,
getRelDetailedDescription,
operatorFactory,
InternalTypeInfo.of(resultRowType),
inputTransformation.getParallelism)
}