in scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/macros/impl/ParquetSchemaProvider.scala [9:83]
def toParquetSchemaImpl[T](c: Context)(implicit T: c.WeakTypeTag[T]): c.Expr[String] = {
import c.universe._
if (!IsCaseClassImpl.isCaseClassType(c)(T.tpe))
c.abort(
c.enclosingPosition,
s"""We cannot enforce ${T.tpe} is a case class, either it is not a case class or this macro call is possibly enclosed in a class.
This will mean the macro is operating on a non-resolved type."""
)
def matchField(fieldType: Type, originalFieldName: String, isOption: Boolean): Tree = {
val fieldName = fieldRenamer(originalFieldName)
val REPETITION_REQUIRED = q"_root_.org.apache.parquet.schema.Type.Repetition.REQUIRED"
val REPETITION_OPTIONAL = q"_root_.org.apache.parquet.schema.Type.Repetition.OPTIONAL"
val REPETITION_REPEATED = q"_root_.org.apache.parquet.schema.Type.Repetition.REPEATED"
def repetition: Tree = if (isOption) REPETITION_OPTIONAL else REPETITION_REQUIRED
def createPrimitiveTypeField(primitiveType: Tree): Tree =
q"""new _root_.org.apache.parquet.schema.PrimitiveType($repetition, $primitiveType, $fieldName)"""
fieldType match {
case tpe if tpe =:= typeOf[String] =>
createPrimitiveTypeField(q"_root_.org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY")
case tpe if tpe =:= typeOf[Boolean] =>
createPrimitiveTypeField(
q"_root_.org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN"
)
case tpe if tpe =:= typeOf[Short] || tpe =:= typeOf[Int] || tpe =:= typeOf[Byte] =>
createPrimitiveTypeField(q"_root_.org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32")
case tpe if tpe =:= typeOf[Long] =>
createPrimitiveTypeField(q"_root_.org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64")
case tpe if tpe =:= typeOf[Float] =>
createPrimitiveTypeField(q"_root_.org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT")
case tpe if tpe =:= typeOf[Double] =>
createPrimitiveTypeField(q"_root_.org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE")
case tpe if tpe.erasure =:= typeOf[Option[Any]] =>
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
matchField(innerType, fieldName, isOption = true)
case tpe if tpe.erasure =:= typeOf[List[Any]] || tpe.erasure =:= typeOf[Set[_]] =>
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
val innerFieldsType = matchField(innerType, "element", isOption = false)
q"_root_.org.apache.parquet.schema.ConversionPatterns.listOfElements($repetition, $fieldName, $innerFieldsType)"
case tpe if tpe.erasure =:= typeOf[Map[_, Any]] =>
val List(keyType, valueType) = tpe.asInstanceOf[TypeRefApi].args
val keyFieldType = matchField(keyType, "key", isOption = false)
val valueFieldType = matchField(valueType, "value", isOption = false)
q"_root_.org.apache.parquet.schema.ConversionPatterns.mapType($repetition, $fieldName, $keyFieldType, $valueFieldType)"
case tpe if IsCaseClassImpl.isCaseClassType(c)(tpe) =>
q"new _root_.org.apache.parquet.schema.GroupType($repetition, $fieldName, ..${expandMethod(tpe)})"
case _ => c.abort(c.enclosingPosition, s"Case class $T has unsupported field type : $fieldType ")
}
}
def expandMethod(outerTpe: Type): List[Tree] =
outerTpe.declarations
.collect { case m: MethodSymbol if m.isCaseAccessor => m }
.map { accessorMethod =>
val fieldName = accessorMethod.name.toString
val fieldType = accessorMethod.returnType
matchField(fieldType, fieldName, isOption = false)
}
.toList
val expanded = expandMethod(T.tpe)
if (expanded.isEmpty)
c.abort(c.enclosingPosition, s"Case class $T.tpe has no fields we were able to extract")
val messageTypeName = s"${T.tpe}".split("\\.").last
val schema = q"""new _root_.org.apache.parquet.schema.MessageType($messageTypeName,
_root_.scala.Array.apply[_root_.org.apache.parquet.schema.Type](..$expanded):_*).toString"""
c.Expr[String](schema)
}