in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/ParquetSampler.scala [110:151]
override def sample(n: Long, head: Boolean): Seq[GenericRecord] = {
require(n > 0, "n must be > 0")
val avroSchema = ParquetIO.getAvroSchemaFromFile(r.toString)
logger.debug("Converted Parquet to Avro schema: {}", avroSchema)
val jobConfig = ParquetIO.genericRecordReadConfig(avroSchema, r.toString)
val reader = AvroParquetReader
.builder[GenericRecord](BeamInputFile.of(r))
.withConf(jobConfig)
.build()
val result = ArrayBuffer.empty[GenericRecord]
if (head) {
// read from the start
var next = reader.read()
while (result.size < n && next != null) {
result.append(next)
next = reader.read()
}
} else {
// Reservoir sample imperative way: Fill result with first n elements
var next = reader.read()
while (result.size < n && next != null) {
result.append(next)
next = reader.read()
}
// Then randomly select from all other elements in the stream
var index = n
while (next != null) {
val loc = nextLong(index + 1)
if (loc < n) {
result(loc.toInt) = next
}
next = reader.read()
index += 1
}
}
reader.close()
result.toList
}