in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala [84:267]
private def convertTo(dataType: TypeInformation[_]): Any => Any = dataType match {
case _ if dataType == Types.BOOLEAN => (obj: Any) => nullSafeConvert(obj) {
case b: Boolean => b
}
case _ if dataType == Types.BYTE => (obj: Any) => nullSafeConvert(obj) {
case c: Byte => c
case c: Short => c.toByte
case c: Int => c.toByte
case c: Long => c.toByte
}
case _ if dataType == Types.SHORT => (obj: Any) => nullSafeConvert(obj) {
case c: Byte => c.toShort
case c: Short => c
case c: Int => c.toShort
case c: Long => c.toShort
}
case _ if dataType == Types.INT => (obj: Any) => nullSafeConvert(obj) {
case c: Byte => c.toInt
case c: Short => c.toInt
case c: Int => c
case c: Long => c.toInt
}
case _ if dataType == Types.LONG => (obj: Any) => nullSafeConvert(obj) {
case c: Byte => c.toLong
case c: Short => c.toLong
case c: Int => c.toLong
case c: Long => c
}
case _ if dataType == Types.FLOAT => (obj: Any) => nullSafeConvert(obj) {
case c: Float => c
case c: Double => c.toFloat
}
case _ if dataType == Types.DOUBLE => (obj: Any) => nullSafeConvert(obj) {
case c: Float => c.toDouble
case c: Double => c
}
case _ if dataType == Types.DECIMAL => (obj: Any) => nullSafeConvert(obj) {
case c: java.math.BigDecimal => c
}
case _ if dataType == Types.SQL_DATE => (obj: Any) => nullSafeConvert(obj) {
case c: Int =>
val millisLocal = c.toLong * 86400000
val millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal)
new Date(millisUtc)
}
case _ if dataType == Types.SQL_TIME => (obj: Any) => nullSafeConvert(obj) {
case c: Long => new Time(c / 1000)
case c: Int => new Time(c.toLong / 1000)
}
case _ if dataType == Types.SQL_TIMESTAMP => (obj: Any) => nullSafeConvert(obj) {
case c: Long => new Timestamp(c / 1000)
case c: Int => new Timestamp(c.toLong / 1000)
}
case _ if dataType == org.apache.flink.api.common.typeinfo.Types.INSTANT =>
(obj: Any) => nullSafeConvert(obj) {
case c: Long => Instant.ofEpochMilli(c / 1000)
case c: Int => Instant.ofEpochMilli(c.toLong / 1000)
}
case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => nullSafeConvert(obj) {
case c: Long => c / 1000
case c: Int => c.toLong / 1000
}
case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) {
case _ => obj.toString
}
case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO =>
(obj: Any) =>
nullSafeConvert(obj) {
case c: String => c.getBytes(StandardCharsets.UTF_8)
case c if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c
}
case _: PrimitiveArrayTypeInfo[_] |
_: BasicArrayTypeInfo[_, _] |
_: ObjectArrayTypeInfo[_, _] =>
var boxed = false
val elementType = dataType match {
case p: PrimitiveArrayTypeInfo[_] =>
p.getComponentType
case b: BasicArrayTypeInfo[_, _] =>
boxed = true
b.getComponentInfo
case o: ObjectArrayTypeInfo[_, _] =>
boxed = true
o.getComponentInfo
}
val elementFromJava = convertTo(elementType)
(obj: Any) => nullSafeConvert(obj) {
case c: java.util.List[_] =>
createArray(elementType,
c.size(),
i => elementFromJava(c.get(i)),
boxed)
case c if c.getClass.isArray =>
createArray(elementType,
c.asInstanceOf[Array[_]].length,
i => elementFromJava(c.asInstanceOf[Array[_]](i)),
boxed)
}
case m: MapTypeInfo[_, _] =>
val keyFromJava = convertTo(m.getKeyTypeInfo)
val valueFromJava = convertTo(m.getValueTypeInfo)
(obj: Any) => nullSafeConvert(obj) {
case javaMap: java.util.Map[_, _] =>
val map = new java.util.HashMap[Any, Any]
javaMap.forEach(new BiConsumer[Any, Any] {
override def accept(k: Any, v: Any): Unit =
map.put(keyFromJava(k), valueFromJava(v))
})
map
}
case rowType: RowTypeInfo =>
val fieldsFromJava = rowType.getFieldTypes.map(f => convertTo(f))
(obj: Any) => nullSafeConvert(obj) {
case c if c.getClass.isArray =>
val r = c.asInstanceOf[Array[_]]
if (r.length != rowType.getFieldTypes.length) {
throw new IllegalStateException(
s"Input row doesn't have expected number of values required by the schema. " +
s"${rowType.getFieldTypes.length} fields are required while ${r.length} " +
s"values are provided."
)
}
val row = new Row(r.length)
var i = 0
while (i < r.length) {
row.setField(i, fieldsFromJava(i)(r(i)))
i += 1
}
row
}
case tupleType: TupleTypeInfo[_] =>
val fieldsTypes: Array[TypeInformation[_]] = new Array[TypeInformation[_]](tupleType.getArity)
for ( i <- 0 until tupleType.getArity) {
fieldsTypes(i) = tupleType.getTypeAt(i)
}
val fieldsFromJava: Array[Any => Any] = fieldsTypes.map(f => convertTo(f))
(obj: Any) => nullSafeConvert(obj) {
case c if c.getClass.isArray =>
val r = c.asInstanceOf[Array[_]]
if (r.length != tupleType.getArity) {
throw new IllegalStateException(
s"Input tuple doesn't have expected number of values required by the schema. " +
s"${tupleType.getArity} fields are required while ${r.length} " +
s"values are provided."
)
}
val tuple = Tuple.newInstance(r.length)
var i: Int = 0
while(i < r.length){
tuple.setField(fieldsFromJava(i)(r(i)), i)
i += 1
}
tuple
}
// UserDefinedType
case _ => (obj: Any) => obj
}