in scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala [290:394]
def readAvro[T <: SpecificRecord: ClassTag](
projection: Schema = null,
predicate: FilterPredicate = null,
conf: Configuration = null
): PTransform[PCollection[ReadableFile], PCollection[T]] =
readAvro(projection, identity[T], predicate, conf)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param projection
* an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
* @param projectionFn
* a function mapping T => R
*/
def readAvro[T <: SpecificRecord: ClassTag, R](
projection: Schema,
projectionFn: T => R
): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro(
projection,
projectionFn,
null,
null
)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param projection
* an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
* @param projectionFn
* a function mapping T => R
* @param predicate
* a Parquet [[FilterPredicate]] predicate
*/
def readAvro[T <: SpecificRecord: ClassTag, R](
projection: Schema,
projectionFn: T => R,
predicate: FilterPredicate
): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro(
projection,
projectionFn,
predicate,
null
)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param projection
* an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
* @param projectionFn
* a function mapping T => R
* @param conf
* a Parquet [[Configuration]]
*/
def readAvro[T <: SpecificRecord: ClassTag, R](
projection: Schema,
projectionFn: T => R,
conf: Configuration
): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro(
projection,
projectionFn,
null,
conf
)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param projection
* an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
* @param projectionFn
* a function mapping T => R
* @param predicate
* a Parquet [[FilterPredicate]] predicate
* @param conf
* a Parquet [[Configuration]]
*/
def readAvro[T <: SpecificRecord: ClassTag, R](
projection: Schema,
projectionFn: T => R,
predicate: FilterPredicate,
conf: Configuration
): PTransform[PCollection[ReadableFile], PCollection[R]] = {
val configuration = ParquetConfiguration.ofNullable(conf)
val avroClass = ScioUtil.classOf[T]
val readSchema = ReflectData.get().getSchema(avroClass)
AvroReadSupport.setAvroReadSchema(configuration, readSchema)
AvroReadSupport.setRequestedProjection(configuration, Option(projection).getOrElse(readSchema))
Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p))
val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn))
readFiles(
ReadSupportFactory.avro[T],
new SerializableConfiguration(configuration),
cleanedFn
)
}