in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSamplerAvro.scala [222:265]
private[samplers] def sample(
sc: ScioContext,
input: String,
output: String,
fields: Seq[String],
fraction: Double,
seed: Option[Int],
hashAlgorithm: HashAlgorithm,
distribution: Option[SampleDistribution],
distributionFields: Seq[String],
precision: Precision,
maxKeySize: Int,
byteEncoding: ByteEncoding = RawEncoding
): ClosedTap[GenericRecord] = {
val schema = AvroIO.getAvroSchemaFromFile(input)
val outputParts = if (output.endsWith("/")) output + "part*" else output + "/part*"
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(schema)
if (FileStorage(outputParts).isDone) {
log.info(s"Reuse previous sample at $outputParts")
ClosedTap(MaterializeTap[GenericRecord](outputParts, sc))
} else {
log.info(s"Will sample from: $input, output will be $output")
val coll = sc.avroFile(input, schema)
val sampledCollection = sampleAvro(
coll,
fraction,
schema,
fields,
seed,
hashAlgorithm,
distribution,
distributionFields,
precision,
maxKeySize,
byteEncoding
)
val r = sampledCollection.saveAsAvroFile(output, schema = schema)
sc.run().waitUntilDone()
r
}
}