public static TypeInformation convertParquetTypeToTypeInfo()

in flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java [95:307]


	public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) {
		TypeInformation<?> typeInfo;
		if (fieldType.isPrimitive()) {
			OriginalType originalType = fieldType.getOriginalType();
			PrimitiveType primitiveType = fieldType.asPrimitiveType();
			switch (primitiveType.getPrimitiveTypeName()) {
				case BINARY:
					if (originalType != null) {
						switch (originalType) {
							case DECIMAL:
								typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
								break;
							case UTF8:
							case ENUM:
							case JSON:
							case BSON:
								typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
								break;
							default:
								throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
									+ " for primitive type BINARY");
						}
					} else {
						typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
					}
					break;
				case BOOLEAN:
					typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
					break;
				case INT32:
					if (originalType != null) {
						switch (originalType) {
							case TIME_MICROS:
							case TIME_MILLIS:
								typeInfo = SqlTimeTypeInfo.TIME;
								break;
							case TIMESTAMP_MICROS:
							case TIMESTAMP_MILLIS:
								typeInfo = SqlTimeTypeInfo.TIMESTAMP;
								break;
							case DATE:
								typeInfo = SqlTimeTypeInfo.DATE;
								break;
							case UINT_8:
							case UINT_16:
							case UINT_32:
								typeInfo = BasicTypeInfo.INT_TYPE_INFO;
								break;
							case INT_8:
								typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE;
								break;
							case INT_16:
								typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT;
								break;
							case INT_32:
								typeInfo = BasicTypeInfo.INT_TYPE_INFO;
								break;
							default:
								throw new UnsupportedOperationException("Unsupported original type : "
									+ originalType.name() + " for primitive type INT32");
						}
					} else {
						typeInfo = BasicTypeInfo.INT_TYPE_INFO;
					}
					break;
				case INT64:
					if (originalType != null) {
						switch (originalType) {
							case TIME_MICROS:
								typeInfo = SqlTimeTypeInfo.TIME;
								break;
							case TIMESTAMP_MICROS:
							case TIMESTAMP_MILLIS:
								typeInfo = SqlTimeTypeInfo.TIMESTAMP;
								break;
							case INT_64:
							case DECIMAL:
								typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
								break;
							default:
								throw new UnsupportedOperationException("Unsupported original type : "
									+ originalType.name() + " for primitive type INT64");
						}
					} else {
						typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
					}
					break;
				case INT96:
					// It stores a timestamp type data, we read it as millisecond
					typeInfo = SqlTimeTypeInfo.TIMESTAMP;
					break;
				case FLOAT:
					typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
					break;
				case DOUBLE:
					typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
					break;
				case FIXED_LEN_BYTE_ARRAY:
					if (originalType != null) {
						switch (originalType) {
							case DECIMAL:
								typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
								break;
							default:
								throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
									+ " for primitive type FIXED_LEN_BYTE_ARRAY");
						}
					} else {
						typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
					}
					break;
				default:
					throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
			}
		} else {
			GroupType parquetGroupType = fieldType.asGroupType();
			OriginalType originalType = parquetGroupType.getOriginalType();
			if (originalType != null) {
				switch (originalType) {
					case LIST:
						if (parquetGroupType.getFieldCount() != 1) {
							throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
						}
						Type repeatedType = parquetGroupType.getType(0);
						if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
							throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
						}

						if (repeatedType.isPrimitive()) {
							typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType);
						} else {
							// Backward-compatibility element group name can be any string (element/array/other)
							GroupType elementType = repeatedType.asGroupType();
							// If the repeated field is a group with multiple fields, then its type is the element
							// type and elements are required.
							if (elementType.getFieldCount() > 1) {

								for (Type type : elementType.getFields()) {
									if (!type.isRepetition(Type.Repetition.REQUIRED)) {
										throw new UnsupportedOperationException(
											String.format("List field [%s] in List [%s] has to be required. ",
												type.toString(), fieldType.getName()));
									}
								}
								typeInfo = ObjectArrayTypeInfo.getInfoFor(
									convertParquetTypeToTypeInfo(elementType));
							} else {
								Type internalType = elementType.getType(0);
								if (internalType.isPrimitive()) {
									typeInfo = convertParquetPrimitiveListToFlinkArray(internalType);
								} else {
									// No need to do special process for group named array and tuple
									GroupType tupleGroup = internalType.asGroupType();
									if (tupleGroup.getFieldCount() == 1 && tupleGroup.getFields().get(0)
										.isRepetition(Type.Repetition.REQUIRED)) {
										typeInfo = ObjectArrayTypeInfo.getInfoFor(
											convertParquetTypeToTypeInfo(internalType));
									} else {
										throw new UnsupportedOperationException(
											String.format("Unrecgonized List schema [%s] according to Parquet"
												+ " standard", parquetGroupType.toString()));
									}
								}
							}
						}
						break;

					case MAP_KEY_VALUE:
					case MAP:
						// The outer-most level must be a group annotated with MAP
						// that contains a single field named key_value
						if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
							throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
						}

						// The middle level  must be a repeated group with a key field for map keys
						// and, optionally, a value field for map values. But we can't enforce two strict condition here
						// the schema generated by Parquet lib doesn't contain LogicalType
						// ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
						GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
						if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
							|| mapKeyValType.getFieldCount() != 2) {
							throw new UnsupportedOperationException(
								"The middle level of Map should be single field named key_value. Invalid map type "
									+ parquetGroupType);
						}

						Type keyType = mapKeyValType.getType(0);

						// The key field encodes the map's key type. This field must have repetition required and
						// must always be present.
						if (!keyType.isPrimitive() || !keyType.isRepetition(Type.Repetition.REQUIRED)
							|| !keyType.asPrimitiveType().getPrimitiveTypeName().equals(
								PrimitiveType.PrimitiveTypeName.BINARY)
							|| !keyType.getOriginalType().equals(OriginalType.UTF8)) {
							throw new IllegalArgumentException("Map key type must be required binary (UTF8): "
								+ keyType);
						}

						Type valueType = mapKeyValType.getType(1);
						return new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO,
							convertParquetTypeToTypeInfo(valueType));
					default:
						throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
				}
			} else {
				// if no original type than it is a record
				return convertFields(parquetGroupType.getFields());
			}
		}

		return typeInfo;
	}