def singleInput()

in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala [178:313]


  def singleInput(argv: Array[String]): ClosedTap[_] = {
    val (sc, args) = ContextAndArgs(argv)
    // Determines how large our heap should be for topByKey
    val sizePerKey = if (dataflowWorkerMemory(sc.options).exists(_ >= 32)) 1e9.toInt else 1e6.toInt

    val (
      samplePct,
      input,
      output,
      fields,
      seed,
      hashAlgorithm,
      distribution,
      distributionFields,
      exact,
      bigqueryPartitioning
    ) =
      try {
        val pct = args("sample").toFloat
        require(pct > 0.0f && pct <= 1.0f)
        (
          pct,
          args("input"),
          args("output"),
          args.list("fields"),
          args.optional("seed"),
          args.optional("hashAlgorithm").map(HashAlgorithm.fromString).getOrElse(FarmHash),
          args.optional("distribution").map(SampleDistribution.fromString),
          args.list("distributionFields"),
          Precision.fromBoolean(args.boolean("exact", default = false)),
          args.getOrElse("bigqueryPartitioning", "day")
        )
      } catch {
        case e: Throwable =>
          usage()
          throw e
      }

    val byteEncoding = ByteEncoding.fromString(args.getOrElse("byteEncoding", "raw"))

    if (fields.isEmpty) {
      log.warn("No fields to hash on specified, won't guarantee cohorts between datasets.")
    }

    if (seed.isEmpty) {
      log.warn("No seed specified, won't guarantee cohorts between datasets.")
    }

    if (distribution.isEmpty) {
      log.warn("No distribution specified, won't guarantee output distribution")
    }

    if (distribution.isDefined && distributionFields.isEmpty) {
      throw new IllegalArgumentException(
        "distributionFields must be specified if a distribution is given"
      )
    }

    if (parseAsBigQueryTable(input).isDefined) {
      require(
        parseAsBigQueryTable(output).isDefined,
        s"Input is a BigQuery table `$input`, output should be a BigQuery table too," +
          s"but instead it's `$output`."
      )
      require(
        List("DAY", "HOUR", "MONTH", "YEAR", "NULL").contains(bigqueryPartitioning.toUpperCase),
        s"bigqueryPartitioning must be either 'day', 'month', 'year', or 'null', found $bigqueryPartitioning"
      )
      val inputTbl = parseAsBigQueryTable(input).get
      val outputTbl = parseAsBigQueryTable(output).get

      BigSamplerBigQuery.sample(
        sc,
        inputTbl,
        outputTbl,
        fields,
        samplePct,
        seed.map(_.toInt),
        hashAlgorithm,
        distribution,
        distributionFields,
        exact,
        sizePerKey,
        byteEncoding,
        bigqueryPartitioning.toUpperCase
      )
    } else if (parseAsURI(input).isDefined) {
      // right now only support for avro
      require(
        parseAsURI(output).isDefined,
        s"Input is a URI: `$input`, output should be a URI too, but instead it's `$output`."
      )
      // Prompts FileSystems to load service classes, otherwise fetching schema from non-local fails
      FileSystems.setDefaultPipelineOptions(sc.options)
      val fileNames = getMetadata(input).map(_.resourceId().getFilename)

      input match {
        case avroPath if fileNames.exists(_.endsWith("avro")) =>
          log.info(s"Found *.avro files in $avroPath, running BigSamplerAvro")
          BigSamplerAvro.sample(
            sc,
            avroPath,
            output,
            fields,
            samplePct,
            seed.map(_.toInt),
            hashAlgorithm,
            distribution,
            distributionFields,
            exact,
            sizePerKey,
            byteEncoding
          )
        case parquetPath if fileNames.exists(_.endsWith("parquet")) =>
          log.info(s"Found *.parquet files in $parquetPath, running BigSamplerParquet")
          BigSamplerParquet.sample(
            sc,
            parquetPath,
            output,
            fields,
            samplePct,
            seed.map(_.toInt),
            hashAlgorithm,
            distribution,
            distributionFields,
            exact,
            sizePerKey,
            byteEncoding
          )
        case _ =>
          throw new UnsupportedOperationException(s"File $input must be an Avro or Parquet file")
      }
    } else {
      throw new UnsupportedOperationException(s"Input `$input not supported.")
    }
  }