in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala [348:576]
def generateResultExpression(
fieldExprs: Seq[GeneratedExpression],
returnType: TypeInformation[_ <: Any],
resultFieldNames: Seq[String])
: GeneratedExpression = {
// initial type check
if (returnType.getArity != fieldExprs.length) {
throw new CodeGenException(
s"Arity [${returnType.getArity}] of result type [$returnType] does not match " +
s"number [${fieldExprs.length}] of expressions [$fieldExprs].")
}
if (resultFieldNames.length != fieldExprs.length) {
throw new CodeGenException(
s"Arity [${resultFieldNames.length}] of result field names [$resultFieldNames] does not " +
s"match number [${fieldExprs.length}] of expressions [$fieldExprs].")
}
// type check
returnType match {
case pt: PojoTypeInfo[_] =>
fieldExprs.zipWithIndex foreach {
case (fieldExpr, i) if fieldExpr.resultType != pt.getTypeAt(resultFieldNames(i)) =>
throw new CodeGenException(
s"Incompatible types of expression and result type. Expression [$fieldExpr] type is" +
s" [${fieldExpr.resultType}], result type is [${pt.getTypeAt(resultFieldNames(i))}]")
case _ => // ok
}
case ct: CompositeType[_] =>
fieldExprs.zipWithIndex foreach {
case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
throw new CodeGenException(
s"Incompatible types of expression and result type. Expression[$fieldExpr] type is " +
s"[${fieldExpr.resultType}], result type is [${ct.getTypeAt(i)}]")
case _ => // ok
}
case t: TypeInformation[_] if t != fieldExprs.head.resultType =>
throw new CodeGenException(
s"Incompatible types of expression and result type. Expression [${fieldExprs.head}] " +
s"type is [${fieldExprs.head.resultType}], result type is [$t]")
case _ => // ok
}
val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
val boxedFieldExprs = fieldExprs.map(generateOutputFieldBoxing)
// generate result expression
returnType match {
case ri: RowTypeInfo =>
addReusableOutRecord(ri)
val resultSetters = boxedFieldExprs.zipWithIndex map {
case (fieldExpr, i) =>
if (nullCheck) {
s"""
|${fieldExpr.code}
|if (${fieldExpr.nullTerm}) {
| $outRecordTerm.setField($i, null);
|}
|else {
| $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
|}
|""".stripMargin
}
else {
s"""
|${fieldExpr.code}
|$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
|""".stripMargin
}
}
val code = generateCodeSplits(resultSetters)
GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType)
case pt: PojoTypeInfo[_] =>
addReusableOutRecord(pt)
val resultSetters = boxedFieldExprs.zip(resultFieldNames) map {
case (fieldExpr, fieldName) =>
val accessor = getFieldAccessor(pt.getTypeClass, fieldName)
accessor match {
// Reflective access of primitives/Objects
case ObjectPrivateFieldAccessor(field) =>
val fieldTerm = addReusablePrivateFieldAccess(pt.getTypeClass, fieldName)
val defaultIfNull = if (isFieldPrimitive(field)) {
primitiveDefaultValue(fieldExpr.resultType)
} else {
"null"
}
if (nullCheck) {
s"""
|${fieldExpr.code}
|if (${fieldExpr.nullTerm}) {
| ${reflectiveFieldWriteAccess(
fieldTerm,
field,
outRecordTerm,
defaultIfNull)};
|}
|else {
| ${reflectiveFieldWriteAccess(
fieldTerm,
field,
outRecordTerm,
fieldExpr.resultTerm)};
|}
|""".stripMargin
}
else {
s"""
|${fieldExpr.code}
|${reflectiveFieldWriteAccess(
fieldTerm,
field,
outRecordTerm,
fieldExpr.resultTerm)};
|""".stripMargin
}
// primitive or Object field access (implicit boxing)
case _ =>
if (nullCheck) {
s"""
|${fieldExpr.code}
|if (${fieldExpr.nullTerm}) {
| $outRecordTerm.$fieldName = null;
|}
|else {
| $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
|}
|""".stripMargin
}
else {
s"""
|${fieldExpr.code}
|$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
|""".stripMargin
}
}
}
val code = generateCodeSplits(resultSetters)
GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType)
case tup: TupleTypeInfo[_] =>
addReusableOutRecord(tup)
val resultSetters = boxedFieldExprs.zipWithIndex map {
case (fieldExpr, i) =>
val fieldName = "f" + i
if (nullCheck) {
s"""
|${fieldExpr.code}
|if (${fieldExpr.nullTerm}) {
| throw new NullPointerException("Null result cannot be stored in a Tuple.");
|}
|else {
| $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
|}
|""".stripMargin
}
else {
s"""
|${fieldExpr.code}
|$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
|""".stripMargin
}
}
val code = generateCodeSplits(resultSetters)
GeneratedExpression(outRecordTerm, NEVER_NULL, code, returnType)
case _: CaseClassTypeInfo[_] =>
val fieldCodes: String = boxedFieldExprs.map(_.code).mkString("\n")
val constructorParams: String = boxedFieldExprs.map(_.resultTerm).mkString(", ")
val resultTerm = newName(outRecordTerm)
val nullCheckCode = if (nullCheck) {
boxedFieldExprs map { (fieldExpr) =>
s"""
|if (${fieldExpr.nullTerm}) {
| throw new NullPointerException("Null result cannot be stored in a Case Class.");
|}
|""".stripMargin
} mkString "\n"
} else {
""
}
val resultCode =
s"""
|$fieldCodes
|$nullCheckCode
|$returnTypeTerm $resultTerm = new $returnTypeTerm($constructorParams);
|""".stripMargin
// case classes are not splittable
GeneratedExpression(resultTerm, NEVER_NULL, resultCode, returnType)
case _: TypeInformation[_] =>
val fieldExpr = boxedFieldExprs.head
val nullCheckCode = if (nullCheck) {
s"""
|if (${fieldExpr.nullTerm}) {
| throw new NullPointerException("Null result cannot be used for atomic types.");
|}
|""".stripMargin
} else {
""
}
val resultCode =
s"""
|${fieldExpr.code}
|$nullCheckCode
|""".stripMargin
// other types are not splittable
GeneratedExpression(fieldExpr.resultTerm, fieldExpr.nullTerm, resultCode, returnType)
case _ =>
throw new CodeGenException(s"Unsupported result type: $returnType")
}
}