public static DataFormatConverter getConverterForDataType()

in flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java [159:323]


	public static DataFormatConverter getConverterForDataType(DataType originDataType) {
		DataType dataType = originDataType.nullable();
		DataFormatConverter converter = TYPE_TO_CONVERTER.get(dataType);
		if (converter != null) {
			return converter;
		}

		Class<?> clazz = dataType.getConversionClass();
		LogicalType logicalType = dataType.getLogicalType();
		switch (logicalType.getTypeRoot()) {
			case CHAR:
			case VARCHAR:
				if (clazz == String.class) {
					return StringConverter.INSTANCE;
				} else if (clazz == StringData.class) {
					return StringDataConverter.INSTANCE;
				} else {
					throw new RuntimeException("Not support class for VARCHAR: " + clazz);
				}
			case BINARY:
			case VARBINARY:
				return PrimitiveByteArrayConverter.INSTANCE;
			case DECIMAL:
				Tuple2<Integer, Integer> ps = getPrecision(logicalType);
				if (clazz == BigDecimal.class) {
					return new BigDecimalConverter(ps.f0, ps.f1);
				} else if (clazz == DecimalData.class) {
					return new DecimalDataConverter(ps.f0, ps.f1);
				} else {
					throw new RuntimeException("Not support conversion class for DECIMAL: " + clazz);
				}
			case TIMESTAMP_WITHOUT_TIME_ZONE:
				int precisionOfTS = getDateTimePrecision(logicalType);
				if (clazz == Timestamp.class) {
					return new TimestampConverter(precisionOfTS);
				} else if (clazz == LocalDateTime.class) {
					return new LocalDateTimeConverter(precisionOfTS);
				} else if (clazz == TimestampData.class) {
					return new TimestampDataConverter(precisionOfTS);
				} else {
					throw new RuntimeException("Not support conversion class for TIMESTAMP WITHOUT TIME ZONE: " + clazz);
				}
			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
				int precisionOfLZTS = getDateTimePrecision(logicalType);
				if (clazz == Instant.class) {
					return new InstantConverter(precisionOfLZTS);
				} else if (clazz == Long.class || clazz == long.class) {
					return new LongTimestampDataConverter(precisionOfLZTS);
				} else if (clazz == TimestampData.class) {
					return new TimestampDataConverter(precisionOfLZTS);
				} else {
					throw new RuntimeException("Not support conversion class for TIMESTAMP WITH LOCAL TIME ZONE: " + clazz);
				}
			case ARRAY:
				if (clazz == ArrayData.class) {
					return ArrayDataConverter.INSTANCE;
				} else if (clazz == boolean[].class) {
					return PrimitiveBooleanArrayConverter.INSTANCE;
				} else if (clazz == short[].class) {
					return PrimitiveShortArrayConverter.INSTANCE;
				} else if (clazz == int[].class) {
					return PrimitiveIntArrayConverter.INSTANCE;
				} else if (clazz == long[].class) {
					return PrimitiveLongArrayConverter.INSTANCE;
				} else if (clazz == float[].class) {
					return PrimitiveFloatArrayConverter.INSTANCE;
				} else if (clazz == double[].class) {
					return PrimitiveDoubleArrayConverter.INSTANCE;
				}
				if (dataType instanceof CollectionDataType) {
					return new ObjectArrayConverter(
							((CollectionDataType) dataType).getElementDataType().bridgedTo(clazz.getComponentType()));
				} else {
					BasicArrayTypeInfo typeInfo =
							(BasicArrayTypeInfo) ((LegacyTypeInformationType) dataType.getLogicalType()).getTypeInformation();
					return new ObjectArrayConverter(
							fromLegacyInfoToDataType(typeInfo.getComponentInfo())
									.bridgedTo(clazz.getComponentType()));
				}
			case MAP:
				if (clazz == MapData.class) {
					return MapDataConverter.INSTANCE;
				}
				KeyValueDataType keyValueDataType = (KeyValueDataType) dataType;
				return new MapConverter(keyValueDataType.getKeyDataType(), keyValueDataType.getValueDataType());
			case MULTISET:
				if (clazz == MapData.class) {
					return MapDataConverter.INSTANCE;
				}
				CollectionDataType collectionDataType = (CollectionDataType) dataType;
				return new MapConverter(
						collectionDataType.getElementDataType(),
						DataTypes.INT().bridgedTo(Integer.class));
			case ROW:
			case STRUCTURED_TYPE:
				TypeInformation<?> asTypeInfo = fromDataTypeToTypeInfo(dataType);
				if (asTypeInfo instanceof InternalTypeInfo && clazz == RowData.class) {
					LogicalType realLogicalType = ((InternalTypeInfo<?>) asTypeInfo).toLogicalType();
					return new RowDataConverter(getFieldCount(realLogicalType));
				}

				// legacy

				CompositeType compositeType = (CompositeType) asTypeInfo;
				DataType[] fieldTypes = Stream.iterate(0, x -> x + 1).limit(compositeType.getArity())
						.map((Function<Integer, TypeInformation>) compositeType::getTypeAt)
						.map(TypeConversions::fromLegacyInfoToDataType).toArray(DataType[]::new);
				if (clazz == RowData.class) {
					return new RowDataConverter(compositeType.getArity());
				} else if (clazz == Row.class) {
					return new RowConverter(fieldTypes);
				} else if (Tuple.class.isAssignableFrom(clazz)) {
					return new TupleConverter((Class<Tuple>) clazz, fieldTypes);
				} else if (Product.class.isAssignableFrom(clazz)) {
					return new CaseClassConverter((TupleTypeInfoBase) compositeType, fieldTypes);
				} else {
					return new PojoConverter((PojoTypeInfo) compositeType, fieldTypes);
				}
			case RAW:
				if (logicalType instanceof RawType) {
					final RawType<?> rawType = (RawType<?>) logicalType;
					if (clazz == RawValueData.class) {
						return RawValueDataConverter.INSTANCE;
					} else {
						return new GenericConverter<>(rawType.getTypeSerializer());
					}
				}

				// legacy

				TypeInformation typeInfo = logicalType instanceof LegacyTypeInformationType ?
						((LegacyTypeInformationType) logicalType).getTypeInformation() :
						((TypeInformationRawType) logicalType).getTypeInformation();

				// planner type info
				if (typeInfo instanceof StringDataTypeInfo) {
					return StringDataConverter.INSTANCE;
				} else if (typeInfo instanceof DecimalDataTypeInfo) {
					DecimalDataTypeInfo decimalType = (DecimalDataTypeInfo) typeInfo;
					return new DecimalDataConverter(decimalType.precision(), decimalType.scale());
				} else if (typeInfo instanceof BigDecimalTypeInfo) {
					BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) typeInfo;
					return new BigDecimalConverter(decimalType.precision(), decimalType.scale());
				} else if (typeInfo instanceof TimestampDataTypeInfo) {
					TimestampDataTypeInfo timestampDataTypeInfo = (TimestampDataTypeInfo) typeInfo;
					return new TimestampDataConverter(timestampDataTypeInfo.getPrecision());
				} else if (typeInfo instanceof LegacyLocalDateTimeTypeInfo) {
					LegacyLocalDateTimeTypeInfo dateTimeType = (LegacyLocalDateTimeTypeInfo) typeInfo;
					return new LocalDateTimeConverter(dateTimeType.getPrecision());
				} else if (typeInfo instanceof LegacyTimestampTypeInfo) {
					LegacyTimestampTypeInfo timestampType = (LegacyTimestampTypeInfo) typeInfo;
					return new TimestampConverter(timestampType.getPrecision());
				} else if (typeInfo instanceof LegacyInstantTypeInfo) {
					LegacyInstantTypeInfo instantTypeInfo = (LegacyInstantTypeInfo) typeInfo;
					return new InstantConverter(instantTypeInfo.getPrecision());
				}

				if (clazz == RawValueData.class) {
					return RawValueDataConverter.INSTANCE;
				}
				return new GenericConverter(typeInfo.createSerializer(new ExecutionConfig()));
			default:
				throw new RuntimeException("Not support dataType: " + dataType);
		}
	}