in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [684:797]
def main(cmdlineArgs: Array[String]): Unit = run(cmdlineArgs)
/** Scio pipeline for BigDiffy. */
def run(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val (
inputMode,
keys,
lhs,
rhs,
rowRestriction,
output,
header,
ignore,
unordered,
unorderedKeysList,
outputMode,
ignoreNan
) = {
try {
(
args("input-mode"),
args.list("key"),
args("lhs"),
args("rhs"),
args.optional("rowRestriction"),
args("output"),
args.boolean("with-header", false),
args.list("ignore").toSet,
args.list("unordered").toSet,
args.list("unorderedFieldKey"),
args.optional("output-mode"),
args.boolean("ignore-nan", false)
)
} catch {
case e: Throwable =>
usage()
throw e
}
}
val unorderedKeys = unorderedKeysMap(unorderedKeysList) match {
case Success(m) => m
case Failure(e) =>
usage()
throw e
}
val om: OutputMode = outputMode match {
case Some("gcs") => GCS
case Some("bigquery") => BQ
case None => GCS
case m => throw new IllegalArgumentException(s"output mode $m not supported")
}
if (om == GCS && !output.startsWith("gs://")) {
// if combo of inputs is invalid, error out early
throw new IllegalArgumentException(
s"Output mode is GCS, " +
s"but output $output is not a valid GCS location"
)
}
// validity checks passed, ok to run the diff
val result = inputMode match {
case "avro" =>
if (rowRestriction.isDefined) {
throw new IllegalArgumentException("rowRestriction cannot be passed for avro inputs")
}
val lhsSchema = avroFileSchema(lhs, sc.options)
val rhsSchema = avroFileSchema(rhs, sc.options)
// validate the rhs schema can be used to read lhs
new SchemaValidatorBuilder().canReadStrategy
.validateLatest()
.validate(rhsSchema, Collections.singletonList(lhsSchema))
if (lhsSchema != rhsSchema) {
logger.warn("Schemas are different but compatible, using the rhs schema for diff")
}
val schema = rhsSchema
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)
val lhsSCollection = sc.avroFile(lhs, schema)
val rhsSCollection = sc.avroFile(rhs, schema)
BigDiffy
.diff[GenericRecord](lhsSCollection, rhsSCollection, diffy, avroKeyFn(keys), ignoreNan)
case "parquet" =>
if (rowRestriction.isDefined) {
throw new IllegalArgumentException("rowRestriction cannot be passed for Parquet inputs")
}
val compatSchema = ParquetIO.getCompatibleSchemaForFiles(lhs, rhs)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)(
avroGenericRecordCoder(compatSchema)
)
BigDiffy.diffParquet(sc, lhs, rhs, avroKeyFn(keys), diffy)
case "bigquery" =>
// TODO: handle schema evolution
val bq = BigQuery.defaultInstance()
val lSchema = bq.tables.schema(lhs)
val rSchema = bq.tables.schema(rhs)
val schema = mergeTableSchema(lSchema, rSchema)
val diffy = new TableRowDiffy(schema, ignore, unordered, unorderedKeys)
BigDiffy.diffTableRow(sc, lhs, rhs, rowRestriction, tableRowKeyFn(keys), diffy, ignoreNan)
case m =>
throw new IllegalArgumentException(s"input mode $m not supported")
}
saveStats(result, output, header, om)
sc.run().waitUntilDone()
}