in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSamplerBigQuery.scala [144:203]
private[samplers] def sample(
sc: ScioContext,
inputTbl: TableReference,
outputTbl: TableReference,
fields: List[String],
fraction: Double,
seed: Option[Int],
hashAlgorithm: HashAlgorithm,
distribution: Option[SampleDistribution],
distributionFields: List[String],
precision: Precision,
sizePerKey: Int,
byteEncoding: ByteEncoding = RawEncoding,
bigqueryPartitioning: String
): ClosedTap[TableRow] = {
import BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
import BigQueryIO.Write.WriteDisposition.WRITE_EMPTY
val patchedBigQueryService = new PatchedBigQueryServicesImpl()
.getDatasetService(sc.optionsAs[BigQueryOptions])
if (patchedBigQueryService.getTable(outputTbl) != null) {
log.info(s"Reuse previous sample at $outputTbl")
ClosedTap(BigQueryTap(outputTbl))
} else {
log.info(s"Will sample from BigQuery table: $inputTbl, output will be $outputTbl")
val schema = patchedBigQueryService.getTable(inputTbl).getSchema
val coll = sc.bigQueryTable(Table.Ref(inputTbl))
val sampledCollection = sampleTableRow(
coll,
fraction,
schema,
fields,
seed,
hashAlgorithm,
distribution,
distributionFields,
precision,
sizePerKey,
byteEncoding
)
val partitioning = bigqueryPartitioning match {
case "NULL" => null
case _ => TimePartitioning(bigqueryPartitioning)
}
val r = sampledCollection
.saveAsBigQueryTable(
Table.Ref(outputTbl),
schema,
WRITE_EMPTY,
CREATE_IF_NEEDED,
tableDescription = "",
partitioning
)
sc.run().waitUntilDone()
r
}
}