def readAvro[T <: SpecificRecord: ClassTag]()

in scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala [290:394]


  def readAvro[T <: SpecificRecord: ClassTag](
    projection: Schema = null,
    predicate: FilterPredicate = null,
    conf: Configuration = null
  ): PTransform[PCollection[ReadableFile], PCollection[T]] =
    readAvro(projection, identity[T], predicate, conf)

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param projection
   *   an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
   * @param projectionFn
   *   a function mapping T => R
   */
  def readAvro[T <: SpecificRecord: ClassTag, R](
    projection: Schema,
    projectionFn: T => R
  ): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro(
    projection,
    projectionFn,
    null,
    null
  )

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param projection
   *   an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
   * @param projectionFn
   *   a function mapping T => R
   * @param predicate
   *   a Parquet [[FilterPredicate]] predicate
   */
  def readAvro[T <: SpecificRecord: ClassTag, R](
    projection: Schema,
    projectionFn: T => R,
    predicate: FilterPredicate
  ): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro(
    projection,
    projectionFn,
    predicate,
    null
  )

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param projection
   *   an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
   * @param projectionFn
   *   a function mapping T => R
   * @param conf
   *   a Parquet [[Configuration]]
   */
  def readAvro[T <: SpecificRecord: ClassTag, R](
    projection: Schema,
    projectionFn: T => R,
    conf: Configuration
  ): PTransform[PCollection[ReadableFile], PCollection[R]] = readAvro(
    projection,
    projectionFn,
    null,
    conf
  )

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[SpecificRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param projection
   *   an [[Schema]] used for Projection, made up of a subset of fields from the full Avro type `T`
   * @param projectionFn
   *   a function mapping T => R
   * @param predicate
   *   a Parquet [[FilterPredicate]] predicate
   * @param conf
   *   a Parquet [[Configuration]]
   */
  def readAvro[T <: SpecificRecord: ClassTag, R](
    projection: Schema,
    projectionFn: T => R,
    predicate: FilterPredicate,
    conf: Configuration
  ): PTransform[PCollection[ReadableFile], PCollection[R]] = {
    val configuration = ParquetConfiguration.ofNullable(conf)

    val avroClass = ScioUtil.classOf[T]
    val readSchema = ReflectData.get().getSchema(avroClass)
    AvroReadSupport.setAvroReadSchema(configuration, readSchema)
    AvroReadSupport.setRequestedProjection(configuration, Option(projection).getOrElse(readSchema))

    Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p))

    val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn))
    readFiles(
      ReadSupportFactory.avro[T],
      new SerializableConfiguration(configuration),
      cleanedFn
    )
  }