in scalding-parquet/src/main/scala/com/twitter/scalding/parquet/tuple/macros/impl/ParquetReadSupportProvider.scala [17:243]
def toParquetReadSupportImpl[T](
ctx: Context
)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetReadSupport[T]] = {
import ctx.universe._
if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
ctx.abort(
ctx.enclosingPosition,
s"""We cannot enforce ${T.tpe} is a case class,
either it is not a case class or this macro call is possibly enclosed in a class.
This will mean the macro is operating on a non-resolved type."""
)
def buildGroupConverter(
tpe: Type,
converters: List[Tree],
converterGetters: List[Tree],
converterResetCalls: List[Tree],
valueBuilder: Tree
): Tree =
q"""new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter[$tpe]{
..$converters
override def currentValue: $tpe = $valueBuilder
override def getConverter(i: Int): _root_.org.apache.parquet.io.api.Converter = {
..$converterGetters
throw new RuntimeException("invalid index: " + i)
}
override def reset(): Unit = {
..$converterResetCalls
}
}"""
def matchField(idx: Int, fieldType: Type, collectionType: CollectionType): (Tree, Tree, Tree, Tree) = {
def fieldConverter(converterName: TermName, converter: Tree, isPrimitive: Boolean = false): Tree = {
def primitiveCollectionElementConverter: Tree =
q"""override val child: _root_.com.twitter.scalding.parquet.tuple.scheme.TupleFieldConverter[$fieldType] =
new _root_.com.twitter.scalding.parquet.tuple.scheme.CollectionElementPrimitiveConverter[$fieldType](this) {
override val delegate: _root_.com.twitter.scalding.parquet.tuple.scheme.PrimitiveFieldConverter[$fieldType] = $converter
}
"""
def caseClassFieldCollectionElementConverter: Tree =
q"""override val child: _root_.com.twitter.scalding.parquet.tuple.scheme.TupleFieldConverter[$fieldType] =
new _root_.com.twitter.scalding.parquet.tuple.scheme.CollectionElementGroupConverter[$fieldType](this) {
override val delegate: _root_.com.twitter.scalding.parquet.tuple.scheme.TupleFieldConverter[$fieldType] = $converter
}
"""
collectionType match {
case OPTION =>
val child =
if (isPrimitive) primitiveCollectionElementConverter
else caseClassFieldCollectionElementConverter
q"""
val $converterName = new _root_.com.twitter.scalding.parquet.tuple.scheme.OptionConverter[$fieldType] {
$child
}
"""
case LIST =>
val child =
if (isPrimitive) primitiveCollectionElementConverter
else caseClassFieldCollectionElementConverter
q"""
val $converterName = new _root_.com.twitter.scalding.parquet.tuple.scheme.ListConverter[$fieldType] {
$child
}
"""
case SET =>
val child =
if (isPrimitive) primitiveCollectionElementConverter
else caseClassFieldCollectionElementConverter
q"""
val $converterName = new _root_.com.twitter.scalding.parquet.tuple.scheme.SetConverter[$fieldType] {
$child
}
"""
case MAP => converter
case _ => q"val $converterName = $converter"
}
}
def createMapFieldConverter(
converterName: TermName,
K: Type,
V: Type,
keyConverter: Tree,
valueConverter: Tree
): Tree =
q"""val $converterName = new _root_.com.twitter.scalding.parquet.tuple.scheme.MapConverter[$K, $V] {
override val child: _root_.com.twitter.scalding.parquet.tuple.scheme.TupleFieldConverter[($K, $V)] =
new _root_.com.twitter.scalding.parquet.tuple.scheme.MapKeyValueConverter[$K, $V](this) {
override val keyConverter: _root_.com.twitter.scalding.parquet.tuple.scheme.TupleFieldConverter[$K] = $keyConverter
override val valueConverter: _root_.com.twitter.scalding.parquet.tuple.scheme.TupleFieldConverter[$V] = $valueConverter
}
}
"""
def createFieldMatchResult(converterName: TermName, converter: Tree): (Tree, Tree, Tree, Tree) = {
val converterGetter: Tree = q"if($idx == i) return $converterName"
val converterResetCall: Tree = q"$converterName.reset()"
val converterFieldValue: Tree = q"$converterName.currentValue"
(converter, converterGetter, converterResetCall, converterFieldValue)
}
def matchPrimitiveField(converterType: Type): (Tree, Tree, Tree, Tree) = {
val converterName = newTermName(ctx.fresh("fieldConverter"))
val innerConverter: Tree = q"new $converterType()"
val converter: Tree = fieldConverter(converterName, innerConverter, isPrimitive = true)
createFieldMatchResult(converterName, converter)
}
def matchCaseClassField(groupConverter: Tree): (Tree, Tree, Tree, Tree) = {
val converterName = newTermName(ctx.fresh("fieldConverter"))
val converter: Tree = fieldConverter(converterName, groupConverter)
createFieldMatchResult(converterName, converter)
}
def matchMapField(
K: Type,
V: Type,
keyConverter: Tree,
valueConverter: Tree
): (Tree, Tree, Tree, Tree) = {
val converterName = newTermName(ctx.fresh("fieldConverter"))
val mapConverter = createMapFieldConverter(converterName, K, V, keyConverter, valueConverter)
createFieldMatchResult(converterName, mapConverter)
}
fieldType match {
case tpe if tpe =:= typeOf[String] =>
matchPrimitiveField(typeOf[StringConverter])
case tpe if tpe =:= typeOf[Boolean] =>
matchPrimitiveField(typeOf[BooleanConverter])
case tpe if tpe =:= typeOf[Byte] =>
matchPrimitiveField(typeOf[ByteConverter])
case tpe if tpe =:= typeOf[Short] =>
matchPrimitiveField(typeOf[ShortConverter])
case tpe if tpe =:= typeOf[Int] =>
matchPrimitiveField(typeOf[IntConverter])
case tpe if tpe =:= typeOf[Long] =>
matchPrimitiveField(typeOf[LongConverter])
case tpe if tpe =:= typeOf[Float] =>
matchPrimitiveField(typeOf[FloatConverter])
case tpe if tpe =:= typeOf[Double] =>
matchPrimitiveField(typeOf[DoubleConverter])
case tpe if tpe.erasure =:= typeOf[Option[Any]] =>
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
matchField(idx, innerType, OPTION)
case tpe if tpe.erasure =:= typeOf[List[Any]] =>
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
matchField(idx, innerType, LIST)
case tpe if tpe.erasure =:= typeOf[Set[_]] =>
val innerType = tpe.asInstanceOf[TypeRefApi].args.head
matchField(idx, innerType, SET)
case tpe if tpe.erasure =:= typeOf[Map[_, Any]] =>
val List(keyType, valueType) = tpe.asInstanceOf[TypeRefApi].args
val (keyConverter, _, _, _) = matchField(0, keyType, MAP)
val (valueConverter, _, _, _) = matchField(0, valueType, MAP)
matchMapField(keyType, valueType, keyConverter, valueConverter)
case tpe if IsCaseClassImpl.isCaseClassType(ctx)(tpe) =>
val (innerConverters, innerConvertersGetters, innerConvertersResetCalls, innerFieldValues) = unzip(
expandMethod(tpe)
)
val innerValueBuilderTree = buildTupleValue(tpe, innerFieldValues)
val converterTree: Tree = buildGroupConverter(
tpe,
innerConverters,
innerConvertersGetters,
innerConvertersResetCalls,
innerValueBuilderTree
)
matchCaseClassField(converterTree)
case _ => ctx.abort(ctx.enclosingPosition, s"Case class $T has unsupported field type : $fieldType ")
}
}
def expandMethod(outerTpe: Type): List[(Tree, Tree, Tree, Tree)] =
outerTpe.declarations
.collect { case m: MethodSymbol if m.isCaseAccessor => m }
.zipWithIndex
.map { case (accessorMethod, idx) =>
val fieldType = accessorMethod.returnType
matchField(idx, fieldType, NOT_A_COLLECTION)
}
.toList
def unzip(
treeTuples: List[(Tree, Tree, Tree, Tree)]
): (List[Tree], List[Tree], List[Tree], List[Tree]) = {
val emptyTreeList = List[Tree]()
treeTuples.foldRight(emptyTreeList, emptyTreeList, emptyTreeList, emptyTreeList) {
case ((t1, t2, t3, t4), (l1, l2, l3, l4)) =>
(t1 :: l1, t2 :: l2, t3 :: l3, t4 :: l4)
}
}
def buildTupleValue(tpe: Type, fieldValueBuilders: List[Tree]): Tree = {
if (fieldValueBuilders.isEmpty)
ctx.abort(ctx.enclosingPosition, s"Case class $tpe has no primitive types we were able to extract")
val companion = tpe.typeSymbol.companionSymbol
q"$companion(..$fieldValueBuilders)"
}
val (converters, converterGetters, convertersResetCalls, fieldValues) = unzip(expandMethod(T.tpe))
val groupConverter = buildGroupConverter(
T.tpe,
converters,
converterGetters,
convertersResetCalls,
buildTupleValue(T.tpe, fieldValues)
)
val schema = schemaProvider.toParquetSchemaImpl[T](ctx)
val readSupport = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetReadSupport[$T]($schema) {
override val tupleConverter: _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter[$T] = $groupConverter
}
"""
ctx.Expr[ParquetReadSupport[T]](readSupport)
}