override def sample()

in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/ParquetSampler.scala [110:151]


  override def sample(n: Long, head: Boolean): Seq[GenericRecord] = {
    require(n > 0, "n must be > 0")

    val avroSchema = ParquetIO.getAvroSchemaFromFile(r.toString)
    logger.debug("Converted Parquet to Avro schema: {}", avroSchema)

    val jobConfig = ParquetIO.genericRecordReadConfig(avroSchema, r.toString)

    val reader = AvroParquetReader
      .builder[GenericRecord](BeamInputFile.of(r))
      .withConf(jobConfig)
      .build()

    val result = ArrayBuffer.empty[GenericRecord]
    if (head) {
      // read from the start
      var next = reader.read()
      while (result.size < n && next != null) {
        result.append(next)
        next = reader.read()
      }
    } else {
      // Reservoir sample imperative way: Fill result with first n elements
      var next = reader.read()
      while (result.size < n && next != null) {
        result.append(next)
        next = reader.read()
      }
      // Then randomly select from all other elements in the stream
      var index = n
      while (next != null) {
        val loc = nextLong(index + 1)
        if (loc < n) {
          result(loc.toInt) = next
        }
        next = reader.read()
        index += 1
      }
    }
    reader.close()
    result.toList
  }