in parquet/src/main/scala/magnolify/parquet/ParquetField.scala [374:458]
implicit def pfMap[K, V](implicit
pfKey: ParquetField[K],
pfValue: ParquetField[V]
): ParquetField[Map[K, V]] = {
new ParquetField[Map[K, V]] {
override def buildSchema(cm: CaseMapper): Type = {
val keySchema = Schema.rename(pfKey.schema(cm), KeyField)
require(keySchema.isRepetition(Repetition.REQUIRED), "Map key must be required")
val valueSchema = Schema.rename(pfValue.schema(cm), ValueField)
val keyValue = Types
.repeatedGroup()
.addField(keySchema)
.addField(valueSchema)
.named(KeyValueGroup)
Types
.requiredGroup()
.addField(keyValue)
.as(LogicalTypeAnnotation.mapType())
.named("map")
}
override val hasAvroArray: Boolean = pfKey.hasAvroArray || pfValue.hasAvroArray
override protected def isEmpty(v: Map[K, V]): Boolean = v.isEmpty
override def fieldDocs(cm: CaseMapper): Map[String, String] = Map.empty
override val typeDoc: Option[String] = None
override def write(c: RecordConsumer, v: Map[K, V])(cm: CaseMapper): Unit = {
if (v.nonEmpty) {
c.startGroup()
c.startField(KeyValueGroup, 0)
v.foreach { case (k, v) =>
c.startGroup()
c.startField(KeyField, 0)
pfKey.writeGroup(c, k)(cm)
c.endField(KeyField, 0)
if (pfValue.nonEmpty(v)) {
c.startField(ValueField, 1)
pfValue.writeGroup(c, v)(cm)
c.endField(ValueField, 1)
}
c.endGroup()
}
c.endField(KeyValueGroup, 0)
c.endGroup()
}
}
override def newConverter: TypeConverter[Map[K, V]] = {
val kvConverter = new GroupConverter with TypeConverter.Buffered[(K, V)] {
private val keyConverter = pfKey.newConverter
private val valueConverter = pfValue.newConverter
private val fieldConverters = Array(keyConverter, valueConverter)
override def isPrimitive: Boolean = false
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
override def start(): Unit = ()
override def end(): Unit = {
val key = keyConverter.get
val value = valueConverter.get
addValue(key -> value)
}
}.withRepetition(Repetition.REPEATED)
val mapConverter = new TypeConverter.Delegate[(K, V), Map[K, V]](kvConverter) {
override def get: Map[K, V] = inner.get(_.toMap)
}
new GroupConverter with TypeConverter.Buffered[Map[K, V]] {
override def getConverter(fieldIndex: Int): Converter = {
require(fieldIndex == 0, "Map field index != 0")
mapConverter
}
override def start(): Unit = ()
override def end(): Unit = addValue(mapConverter.get)
override def get: Map[K, V] = get(_.headOption.getOrElse(Map.empty))
}
}
}
}