private def convertTo()

in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala [84:267]


  private def convertTo(dataType: TypeInformation[_]): Any => Any = dataType match {
    case _ if dataType == Types.BOOLEAN => (obj: Any) => nullSafeConvert(obj) {
      case b: Boolean => b
    }

    case _ if dataType == Types.BYTE => (obj: Any) => nullSafeConvert(obj) {
      case c: Byte => c
      case c: Short => c.toByte
      case c: Int => c.toByte
      case c: Long => c.toByte
    }

    case _ if dataType == Types.SHORT => (obj: Any) => nullSafeConvert(obj) {
      case c: Byte => c.toShort
      case c: Short => c
      case c: Int => c.toShort
      case c: Long => c.toShort
    }

    case _ if dataType == Types.INT => (obj: Any) => nullSafeConvert(obj) {
      case c: Byte => c.toInt
      case c: Short => c.toInt
      case c: Int => c
      case c: Long => c.toInt
    }

    case _ if dataType == Types.LONG => (obj: Any) => nullSafeConvert(obj) {
      case c: Byte => c.toLong
      case c: Short => c.toLong
      case c: Int => c.toLong
      case c: Long => c
    }

    case _ if dataType == Types.FLOAT => (obj: Any) => nullSafeConvert(obj) {
      case c: Float => c
      case c: Double => c.toFloat
    }

    case _ if dataType == Types.DOUBLE => (obj: Any) => nullSafeConvert(obj) {
      case c: Float => c.toDouble
      case c: Double => c
    }

    case _ if dataType == Types.DECIMAL => (obj: Any) => nullSafeConvert(obj) {
      case c: java.math.BigDecimal => c
    }

    case _ if dataType == Types.SQL_DATE => (obj: Any) => nullSafeConvert(obj) {
      case c: Int =>
        val millisLocal = c.toLong * 86400000
        val millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal)
        new Date(millisUtc)
    }

    case _ if dataType == Types.SQL_TIME => (obj: Any) => nullSafeConvert(obj) {
      case c: Long => new Time(c / 1000)
      case c: Int => new Time(c.toLong / 1000)
    }

    case _ if dataType == Types.SQL_TIMESTAMP => (obj: Any) => nullSafeConvert(obj) {
      case c: Long => new Timestamp(c / 1000)
      case c: Int => new Timestamp(c.toLong / 1000)
    }

    case _ if dataType == org.apache.flink.api.common.typeinfo.Types.INSTANT =>
      (obj: Any) => nullSafeConvert(obj) {
        case c: Long => Instant.ofEpochMilli(c / 1000)
        case c: Int => Instant.ofEpochMilli(c.toLong / 1000)
    }

    case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => nullSafeConvert(obj) {
      case c: Long => c / 1000
      case c: Int => c.toLong / 1000
    }

    case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) {
      case _ => obj.toString
    }

    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO =>
      (obj: Any) =>
        nullSafeConvert(obj) {
          case c: String => c.getBytes(StandardCharsets.UTF_8)
          case c if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c
        }

    case _: PrimitiveArrayTypeInfo[_] |
         _: BasicArrayTypeInfo[_, _] |
         _: ObjectArrayTypeInfo[_, _] =>
      var boxed = false
      val elementType = dataType match {
        case p: PrimitiveArrayTypeInfo[_] =>
          p.getComponentType
        case b: BasicArrayTypeInfo[_, _] =>
          boxed = true
          b.getComponentInfo
        case o: ObjectArrayTypeInfo[_, _] =>
          boxed = true
          o.getComponentInfo
      }
      val elementFromJava = convertTo(elementType)

      (obj: Any) => nullSafeConvert(obj) {
        case c: java.util.List[_] =>
          createArray(elementType,
                      c.size(),
                      i => elementFromJava(c.get(i)),
                      boxed)
        case c if c.getClass.isArray =>
          createArray(elementType,
                      c.asInstanceOf[Array[_]].length,
                      i => elementFromJava(c.asInstanceOf[Array[_]](i)),
                      boxed)
      }

    case m: MapTypeInfo[_, _] =>
      val keyFromJava = convertTo(m.getKeyTypeInfo)
      val valueFromJava = convertTo(m.getValueTypeInfo)

      (obj: Any) => nullSafeConvert(obj) {
        case javaMap: java.util.Map[_, _] =>
          val map = new java.util.HashMap[Any, Any]
          javaMap.forEach(new BiConsumer[Any, Any] {
            override def accept(k: Any, v: Any): Unit =
              map.put(keyFromJava(k), valueFromJava(v))
          })
          map
      }

    case rowType: RowTypeInfo =>
      val fieldsFromJava = rowType.getFieldTypes.map(f => convertTo(f))

      (obj: Any) => nullSafeConvert(obj) {
        case c if c.getClass.isArray =>
          val r = c.asInstanceOf[Array[_]]
          if (r.length != rowType.getFieldTypes.length) {
            throw new IllegalStateException(
              s"Input row doesn't have expected number of values required by the schema. " +
                s"${rowType.getFieldTypes.length} fields are required while ${r.length} " +
                s"values are provided."
              )
          }

          val row = new Row(r.length)
          var i = 0
          while (i < r.length) {
            row.setField(i, fieldsFromJava(i)(r(i)))
            i += 1
          }
          row
      }

    case tupleType: TupleTypeInfo[_] =>
      val fieldsTypes: Array[TypeInformation[_]] = new Array[TypeInformation[_]](tupleType.getArity)
      for ( i <- 0 until tupleType.getArity) {
        fieldsTypes(i) = tupleType.getTypeAt(i)
      }
      
      val fieldsFromJava: Array[Any => Any] = fieldsTypes.map(f => convertTo(f))
      
      (obj: Any) => nullSafeConvert(obj) {
        case c if c.getClass.isArray =>
          val r = c.asInstanceOf[Array[_]]
          if (r.length != tupleType.getArity) {
            throw new IllegalStateException(
              s"Input tuple doesn't have expected number of values required by the schema. " +
                s"${tupleType.getArity} fields are required while ${r.length} " +
                s"values are provided."
            )
          }
          
        val tuple = Tuple.newInstance(r.length)
        var i: Int = 0
        while(i < r.length){
          tuple.setField(fieldsFromJava(i)(r(i)), i)
          i += 1
        }
        tuple
      }
    

    // UserDefinedType
    case _ => (obj: Any) => obj
  }