in parquet/src/main/scala/magnolify/parquet/Predicate.scala [50:102]
def onField[ScalaFieldT](
fieldName: String
)(filterFn: ScalaFieldT => Boolean)(implicit
pf: ParquetField.Primitive[ScalaFieldT]
): FilterPredicate = {
val fieldType = pf.schema(CaseMapper.identity).asPrimitiveType().getPrimitiveTypeName
val column = fieldType match {
case PrimitiveTypeName.INT32 => FilterApi.intColumn(fieldName)
case PrimitiveTypeName.INT64 | PrimitiveTypeName.INT96 => FilterApi.longColumn(fieldName)
case PrimitiveTypeName.BINARY => FilterApi.binaryColumn(fieldName)
case PrimitiveTypeName.FLOAT => FilterApi.floatColumn(fieldName)
case PrimitiveTypeName.DOUBLE => FilterApi.doubleColumn(fieldName)
case PrimitiveTypeName.BOOLEAN => FilterApi.booleanColumn(fieldName)
case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => FilterApi.binaryColumn(fieldName)
}
def wrap[T](addFn: (PrimitiveConverter, T) => Unit): T => ScalaFieldT = {
lazy val converter = pf.newConverter
value => {
addFn(converter.asPrimitiveConverter(), value)
converter.get
}
}
type PC = PrimitiveConverter
val toScalaT = (fieldType match {
case PrimitiveTypeName.INT32 =>
wrap((pc: PC, value: java.lang.Integer) => pc.addInt(value))
case PrimitiveTypeName.INT64 | PrimitiveTypeName.INT96 =>
wrap((pc: PC, value: java.lang.Long) => pc.addLong(value))
case PrimitiveTypeName.BINARY =>
wrap((pc: PC, value: Binary) => pc.addBinary(value))
case PrimitiveTypeName.FLOAT =>
wrap((pc: PC, value: java.lang.Float) => pc.addFloat(value))
case PrimitiveTypeName.DOUBLE =>
wrap((pc: PC, value: java.lang.Double) => pc.addDouble(value))
case PrimitiveTypeName.BOOLEAN =>
wrap((pc: PC, value: java.lang.Boolean) => pc.addBoolean(value))
case PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY =>
wrap((pc: PC, value: Binary) => pc.addBinary(value))
}).asInstanceOf[pf.ParquetT => ScalaFieldT]
FilterApi.userDefined[pf.ParquetT, UserDefinedPredicate[pf.ParquetT] with Serializable](
column.asInstanceOf[Column[pf.ParquetT]],
new UserDefinedPredicate[pf.ParquetT] with Serializable {
override def keep(value: pf.ParquetT): Boolean =
filterFn.apply(toScalaT(value))
override def canDrop(statistics: Statistics[pf.ParquetT]): Boolean = false
override def inverseCanDrop(statistics: Statistics[pf.ParquetT]): Boolean = false
}
)
}