in scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala [169:273]
def readAvroGenericRecordFiles(
schema: Schema,
predicate: FilterPredicate = null,
conf: Configuration = null
): PTransform[PCollection[ReadableFile], PCollection[GenericRecord]] =
readAvroGenericRecordFiles(schema, identity[GenericRecord], predicate, conf)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param schema
* The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
* @param projectionFn
* a function mapping [[GenericRecord]] => T
*/
def readAvroGenericRecordFiles[T](
schema: Schema,
projectionFn: GenericRecord => T
): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles(
schema,
projectionFn,
null,
null
)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param schema
* The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
* @param projectionFn
* a function mapping [[GenericRecord]] => T
* @param predicate
* a Parquet [[FilterPredicate]] predicate
*/
def readAvroGenericRecordFiles[T](
schema: Schema,
projectionFn: GenericRecord => T,
predicate: FilterPredicate
): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles(
schema,
projectionFn,
predicate,
null
)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param schema
* The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
* @param projectionFn
* a function mapping [[GenericRecord]] => T
* @param conf
* a Parquet [[Configuration]]
*/
def readAvroGenericRecordFiles[T](
schema: Schema,
projectionFn: GenericRecord => T,
conf: Configuration
): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles(
schema,
projectionFn,
conf
)
/**
* A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
* supplied schema, then applies a mapping function to convert the Avro records into type T
*
* @param schema
* The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
* @param projectionFn
* a function mapping [[GenericRecord]] => T
* @param predicate
* a Parquet [[FilterPredicate]] predicate
* @param conf
* a Parquet [[Configuration]]
*/
def readAvroGenericRecordFiles[T](
schema: Schema,
projectionFn: GenericRecord => T,
predicate: FilterPredicate,
conf: Configuration
): PTransform[PCollection[ReadableFile], PCollection[T]] = {
val configuration = ParquetConfiguration.ofNullable(conf)
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)
AvroReadSupport.setAvroReadSchema(configuration, schema)
AvroReadSupport.setRequestedProjection(configuration, schema)
Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p))
if (configuration.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
AvroReadSupport.setAvroDataSupplier(configuration, classOf[GenericDataSupplier])
}
val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn))
readFiles(
ReadSupportFactory.avro,
new SerializableConfiguration(configuration),
cleanedFn
)
}