in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala [178:313]
def singleInput(argv: Array[String]): ClosedTap[_] = {
val (sc, args) = ContextAndArgs(argv)
// Determines how large our heap should be for topByKey
val sizePerKey = if (dataflowWorkerMemory(sc.options).exists(_ >= 32)) 1e9.toInt else 1e6.toInt
val (
samplePct,
input,
output,
fields,
seed,
hashAlgorithm,
distribution,
distributionFields,
exact,
bigqueryPartitioning
) =
try {
val pct = args("sample").toFloat
require(pct > 0.0f && pct <= 1.0f)
(
pct,
args("input"),
args("output"),
args.list("fields"),
args.optional("seed"),
args.optional("hashAlgorithm").map(HashAlgorithm.fromString).getOrElse(FarmHash),
args.optional("distribution").map(SampleDistribution.fromString),
args.list("distributionFields"),
Precision.fromBoolean(args.boolean("exact", default = false)),
args.getOrElse("bigqueryPartitioning", "day")
)
} catch {
case e: Throwable =>
usage()
throw e
}
val byteEncoding = ByteEncoding.fromString(args.getOrElse("byteEncoding", "raw"))
if (fields.isEmpty) {
log.warn("No fields to hash on specified, won't guarantee cohorts between datasets.")
}
if (seed.isEmpty) {
log.warn("No seed specified, won't guarantee cohorts between datasets.")
}
if (distribution.isEmpty) {
log.warn("No distribution specified, won't guarantee output distribution")
}
if (distribution.isDefined && distributionFields.isEmpty) {
throw new IllegalArgumentException(
"distributionFields must be specified if a distribution is given"
)
}
if (parseAsBigQueryTable(input).isDefined) {
require(
parseAsBigQueryTable(output).isDefined,
s"Input is a BigQuery table `$input`, output should be a BigQuery table too," +
s"but instead it's `$output`."
)
require(
List("DAY", "HOUR", "MONTH", "YEAR", "NULL").contains(bigqueryPartitioning.toUpperCase),
s"bigqueryPartitioning must be either 'day', 'month', 'year', or 'null', found $bigqueryPartitioning"
)
val inputTbl = parseAsBigQueryTable(input).get
val outputTbl = parseAsBigQueryTable(output).get
BigSamplerBigQuery.sample(
sc,
inputTbl,
outputTbl,
fields,
samplePct,
seed.map(_.toInt),
hashAlgorithm,
distribution,
distributionFields,
exact,
sizePerKey,
byteEncoding,
bigqueryPartitioning.toUpperCase
)
} else if (parseAsURI(input).isDefined) {
// right now only support for avro
require(
parseAsURI(output).isDefined,
s"Input is a URI: `$input`, output should be a URI too, but instead it's `$output`."
)
// Prompts FileSystems to load service classes, otherwise fetching schema from non-local fails
FileSystems.setDefaultPipelineOptions(sc.options)
val fileNames = getMetadata(input).map(_.resourceId().getFilename)
input match {
case avroPath if fileNames.exists(_.endsWith("avro")) =>
log.info(s"Found *.avro files in $avroPath, running BigSamplerAvro")
BigSamplerAvro.sample(
sc,
avroPath,
output,
fields,
samplePct,
seed.map(_.toInt),
hashAlgorithm,
distribution,
distributionFields,
exact,
sizePerKey,
byteEncoding
)
case parquetPath if fileNames.exists(_.endsWith("parquet")) =>
log.info(s"Found *.parquet files in $parquetPath, running BigSamplerParquet")
BigSamplerParquet.sample(
sc,
parquetPath,
output,
fields,
samplePct,
seed.map(_.toInt),
hashAlgorithm,
distribution,
distributionFields,
exact,
sizePerKey,
byteEncoding
)
case _ =>
throw new UnsupportedOperationException(s"File $input must be an Avro or Parquet file")
}
} else {
throw new UnsupportedOperationException(s"Input `$input not supported.")
}
}