private[samplers] def sample()

in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSamplerBigQuery.scala [144:203]


  private[samplers] def sample(
    sc: ScioContext,
    inputTbl: TableReference,
    outputTbl: TableReference,
    fields: List[String],
    fraction: Double,
    seed: Option[Int],
    hashAlgorithm: HashAlgorithm,
    distribution: Option[SampleDistribution],
    distributionFields: List[String],
    precision: Precision,
    sizePerKey: Int,
    byteEncoding: ByteEncoding = RawEncoding,
    bigqueryPartitioning: String
  ): ClosedTap[TableRow] = {
    import BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
    import BigQueryIO.Write.WriteDisposition.WRITE_EMPTY

    val patchedBigQueryService = new PatchedBigQueryServicesImpl()
      .getDatasetService(sc.optionsAs[BigQueryOptions])
    if (patchedBigQueryService.getTable(outputTbl) != null) {
      log.info(s"Reuse previous sample at $outputTbl")
      ClosedTap(BigQueryTap(outputTbl))
    } else {
      log.info(s"Will sample from BigQuery table: $inputTbl, output will be $outputTbl")
      val schema = patchedBigQueryService.getTable(inputTbl).getSchema

      val coll = sc.bigQueryTable(Table.Ref(inputTbl))

      val sampledCollection = sampleTableRow(
        coll,
        fraction,
        schema,
        fields,
        seed,
        hashAlgorithm,
        distribution,
        distributionFields,
        precision,
        sizePerKey,
        byteEncoding
      )

      val partitioning = bigqueryPartitioning match {
        case "NULL" => null
        case _      => TimePartitioning(bigqueryPartitioning)
      }
      val r = sampledCollection
        .saveAsBigQueryTable(
          Table.Ref(outputTbl),
          schema,
          WRITE_EMPTY,
          CREATE_IF_NEEDED,
          tableDescription = "",
          partitioning
        )
      sc.run().waitUntilDone()
      r
    }
  }