in flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java [95:307]
public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) {
TypeInformation<?> typeInfo;
if (fieldType.isPrimitive()) {
OriginalType originalType = fieldType.getOriginalType();
PrimitiveType primitiveType = fieldType.asPrimitiveType();
switch (primitiveType.getPrimitiveTypeName()) {
case BINARY:
if (originalType != null) {
switch (originalType) {
case DECIMAL:
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
break;
case UTF8:
case ENUM:
case JSON:
case BSON:
typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+ " for primitive type BINARY");
}
} else {
typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
}
break;
case BOOLEAN:
typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
break;
case INT32:
if (originalType != null) {
switch (originalType) {
case TIME_MICROS:
case TIME_MILLIS:
typeInfo = SqlTimeTypeInfo.TIME;
break;
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case DATE:
typeInfo = SqlTimeTypeInfo.DATE;
break;
case UINT_8:
case UINT_16:
case UINT_32:
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
break;
case INT_8:
typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE;
break;
case INT_16:
typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT;
break;
case INT_32:
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException("Unsupported original type : "
+ originalType.name() + " for primitive type INT32");
}
} else {
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
}
break;
case INT64:
if (originalType != null) {
switch (originalType) {
case TIME_MICROS:
typeInfo = SqlTimeTypeInfo.TIME;
break;
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case INT_64:
case DECIMAL:
typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException("Unsupported original type : "
+ originalType.name() + " for primitive type INT64");
}
} else {
typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
}
break;
case INT96:
// It stores a timestamp type data, we read it as millisecond
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case FLOAT:
typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
break;
case DOUBLE:
typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
break;
case FIXED_LEN_BYTE_ARRAY:
if (originalType != null) {
switch (originalType) {
case DECIMAL:
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+ " for primitive type FIXED_LEN_BYTE_ARRAY");
}
} else {
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
}
break;
default:
throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
}
} else {
GroupType parquetGroupType = fieldType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
if (originalType != null) {
switch (originalType) {
case LIST:
if (parquetGroupType.getFieldCount() != 1) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
Type repeatedType = parquetGroupType.getType(0);
if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
if (repeatedType.isPrimitive()) {
typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType);
} else {
// Backward-compatibility element group name can be any string (element/array/other)
GroupType elementType = repeatedType.asGroupType();
// If the repeated field is a group with multiple fields, then its type is the element
// type and elements are required.
if (elementType.getFieldCount() > 1) {
for (Type type : elementType.getFields()) {
if (!type.isRepetition(Type.Repetition.REQUIRED)) {
throw new UnsupportedOperationException(
String.format("List field [%s] in List [%s] has to be required. ",
type.toString(), fieldType.getName()));
}
}
typeInfo = ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(elementType));
} else {
Type internalType = elementType.getType(0);
if (internalType.isPrimitive()) {
typeInfo = convertParquetPrimitiveListToFlinkArray(internalType);
} else {
// No need to do special process for group named array and tuple
GroupType tupleGroup = internalType.asGroupType();
if (tupleGroup.getFieldCount() == 1 && tupleGroup.getFields().get(0)
.isRepetition(Type.Repetition.REQUIRED)) {
typeInfo = ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(internalType));
} else {
throw new UnsupportedOperationException(
String.format("Unrecgonized List schema [%s] according to Parquet"
+ " standard", parquetGroupType.toString()));
}
}
}
}
break;
case MAP_KEY_VALUE:
case MAP:
// The outer-most level must be a group annotated with MAP
// that contains a single field named key_value
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
// The middle level must be a repeated group with a key field for map keys
// and, optionally, a value field for map values. But we can't enforce two strict condition here
// the schema generated by Parquet lib doesn't contain LogicalType
// ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
|| mapKeyValType.getFieldCount() != 2) {
throw new UnsupportedOperationException(
"The middle level of Map should be single field named key_value. Invalid map type "
+ parquetGroupType);
}
Type keyType = mapKeyValType.getType(0);
// The key field encodes the map's key type. This field must have repetition required and
// must always be present.
if (!keyType.isPrimitive() || !keyType.isRepetition(Type.Repetition.REQUIRED)
|| !keyType.asPrimitiveType().getPrimitiveTypeName().equals(
PrimitiveType.PrimitiveTypeName.BINARY)
|| !keyType.getOriginalType().equals(OriginalType.UTF8)) {
throw new IllegalArgumentException("Map key type must be required binary (UTF8): "
+ keyType);
}
Type valueType = mapKeyValType.getType(1);
return new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO,
convertParquetTypeToTypeInfo(valueType));
default:
throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
}
} else {
// if no original type than it is a record
return convertFields(parquetGroupType.getFields());
}
}
return typeInfo;
}