def readAvroGenericRecordFiles()

in scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetRead.scala [169:273]


  def readAvroGenericRecordFiles(
    schema: Schema,
    predicate: FilterPredicate = null,
    conf: Configuration = null
  ): PTransform[PCollection[ReadableFile], PCollection[GenericRecord]] =
    readAvroGenericRecordFiles(schema, identity[GenericRecord], predicate, conf)

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param schema
   *   The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
   * @param projectionFn
   *   a function mapping [[GenericRecord]] => T
   */
  def readAvroGenericRecordFiles[T](
    schema: Schema,
    projectionFn: GenericRecord => T
  ): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles(
    schema,
    projectionFn,
    null,
    null
  )

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param schema
   *   The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
   * @param projectionFn
   *   a function mapping [[GenericRecord]] => T
   * @param predicate
   *   a Parquet [[FilterPredicate]] predicate
   */
  def readAvroGenericRecordFiles[T](
    schema: Schema,
    projectionFn: GenericRecord => T,
    predicate: FilterPredicate
  ): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles(
    schema,
    projectionFn,
    predicate,
    null
  )

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param schema
   *   The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
   * @param projectionFn
   *   a function mapping [[GenericRecord]] => T
   * @param conf
   *   a Parquet [[Configuration]]
   */
  def readAvroGenericRecordFiles[T](
    schema: Schema,
    projectionFn: GenericRecord => T,
    conf: Configuration
  ): PTransform[PCollection[ReadableFile], PCollection[T]] = readAvroGenericRecordFiles(
    schema,
    projectionFn,
    conf
  )

  /**
   * A ReadFiles implementation that reads Parquet file(s) into Avro [[GenericRecord]]s using the
   * supplied schema, then applies a mapping function to convert the Avro records into type T
   *
   * @param schema
   *   The Avro [[Schema]] to use for Parquet reads; can be a projection of the full file schema
   * @param projectionFn
   *   a function mapping [[GenericRecord]] => T
   * @param predicate
   *   a Parquet [[FilterPredicate]] predicate
   * @param conf
   *   a Parquet [[Configuration]]
   */
  def readAvroGenericRecordFiles[T](
    schema: Schema,
    projectionFn: GenericRecord => T,
    predicate: FilterPredicate,
    conf: Configuration
  ): PTransform[PCollection[ReadableFile], PCollection[T]] = {
    val configuration = ParquetConfiguration.ofNullable(conf)
    configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)
    AvroReadSupport.setAvroReadSchema(configuration, schema)
    AvroReadSupport.setRequestedProjection(configuration, schema)

    Option(predicate).foreach(p => ParquetInputFormat.setFilterPredicate(configuration, p))
    if (configuration.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
      AvroReadSupport.setAvroDataSupplier(configuration, classOf[GenericDataSupplier])
    }

    val cleanedFn = Functions.serializableFn(ClosureCleaner.clean(projectionFn))
    readFiles(
      ReadSupportFactory.avro,
      new SerializableConfiguration(configuration),
      cleanedFn
    )
  }