implicit def pfMap[K, V]()

in parquet/src/main/scala/magnolify/parquet/ParquetField.scala [374:458]


  implicit def pfMap[K, V](implicit
    pfKey: ParquetField[K],
    pfValue: ParquetField[V]
  ): ParquetField[Map[K, V]] = {
    new ParquetField[Map[K, V]] {
      override def buildSchema(cm: CaseMapper): Type = {
        val keySchema = Schema.rename(pfKey.schema(cm), KeyField)
        require(keySchema.isRepetition(Repetition.REQUIRED), "Map key must be required")
        val valueSchema = Schema.rename(pfValue.schema(cm), ValueField)
        val keyValue = Types
          .repeatedGroup()
          .addField(keySchema)
          .addField(valueSchema)
          .named(KeyValueGroup)
        Types
          .requiredGroup()
          .addField(keyValue)
          .as(LogicalTypeAnnotation.mapType())
          .named("map")
      }

      override val hasAvroArray: Boolean = pfKey.hasAvroArray || pfValue.hasAvroArray

      override protected def isEmpty(v: Map[K, V]): Boolean = v.isEmpty

      override def fieldDocs(cm: CaseMapper): Map[String, String] = Map.empty

      override val typeDoc: Option[String] = None

      override def write(c: RecordConsumer, v: Map[K, V])(cm: CaseMapper): Unit = {
        if (v.nonEmpty) {
          c.startGroup()
          c.startField(KeyValueGroup, 0)
          v.foreach { case (k, v) =>
            c.startGroup()
            c.startField(KeyField, 0)
            pfKey.writeGroup(c, k)(cm)
            c.endField(KeyField, 0)
            if (pfValue.nonEmpty(v)) {
              c.startField(ValueField, 1)
              pfValue.writeGroup(c, v)(cm)
              c.endField(ValueField, 1)
            }
            c.endGroup()
          }
          c.endField(KeyValueGroup, 0)
          c.endGroup()
        }
      }

      override def newConverter: TypeConverter[Map[K, V]] = {
        val kvConverter = new GroupConverter with TypeConverter.Buffered[(K, V)] {
          private val keyConverter = pfKey.newConverter
          private val valueConverter = pfValue.newConverter
          private val fieldConverters = Array(keyConverter, valueConverter)

          override def isPrimitive: Boolean = false

          override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)

          override def start(): Unit = ()

          override def end(): Unit = {
            val key = keyConverter.get
            val value = valueConverter.get
            addValue(key -> value)
          }
        }.withRepetition(Repetition.REPEATED)

        val mapConverter = new TypeConverter.Delegate[(K, V), Map[K, V]](kvConverter) {
          override def get: Map[K, V] = inner.get(_.toMap)
        }

        new GroupConverter with TypeConverter.Buffered[Map[K, V]] {
          override def getConverter(fieldIndex: Int): Converter = {
            require(fieldIndex == 0, "Map field index != 0")
            mapConverter
          }
          override def start(): Unit = ()
          override def end(): Unit = addValue(mapConverter.get)
          override def get: Map[K, V] = get(_.headOption.getOrElse(Map.empty))
        }
      }
    }
  }