def main()

in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [684:797]


  def main(cmdlineArgs: Array[String]): Unit = run(cmdlineArgs)

  /** Scio pipeline for BigDiffy. */
  def run(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val (
      inputMode,
      keys,
      lhs,
      rhs,
      rowRestriction,
      output,
      header,
      ignore,
      unordered,
      unorderedKeysList,
      outputMode,
      ignoreNan
    ) = {
      try {
        (
          args("input-mode"),
          args.list("key"),
          args("lhs"),
          args("rhs"),
          args.optional("rowRestriction"),
          args("output"),
          args.boolean("with-header", false),
          args.list("ignore").toSet,
          args.list("unordered").toSet,
          args.list("unorderedFieldKey"),
          args.optional("output-mode"),
          args.boolean("ignore-nan", false)
        )
      } catch {
        case e: Throwable =>
          usage()
          throw e
      }
    }

    val unorderedKeys = unorderedKeysMap(unorderedKeysList) match {
      case Success(m) => m
      case Failure(e) =>
        usage()
        throw e
    }

    val om: OutputMode = outputMode match {
      case Some("gcs")      => GCS
      case Some("bigquery") => BQ
      case None             => GCS
      case m                => throw new IllegalArgumentException(s"output mode $m not supported")
    }

    if (om == GCS && !output.startsWith("gs://")) {
      // if combo of inputs is invalid, error out early
      throw new IllegalArgumentException(
        s"Output mode is GCS, " +
          s"but output $output is not a valid GCS location"
      )
    }

    // validity checks passed, ok to run the diff
    val result = inputMode match {
      case "avro" =>
        if (rowRestriction.isDefined) {
          throw new IllegalArgumentException("rowRestriction cannot be passed for avro inputs")
        }

        val lhsSchema = avroFileSchema(lhs, sc.options)
        val rhsSchema = avroFileSchema(rhs, sc.options)

        // validate the rhs schema can be used to read lhs
        new SchemaValidatorBuilder().canReadStrategy
          .validateLatest()
          .validate(rhsSchema, Collections.singletonList(lhsSchema))

        if (lhsSchema != rhsSchema) {
          logger.warn("Schemas are different but compatible, using the rhs schema for diff")
        }
        val schema = rhsSchema

        implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)
        val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)
        val lhsSCollection = sc.avroFile(lhs, schema)
        val rhsSCollection = sc.avroFile(rhs, schema)
        BigDiffy
          .diff[GenericRecord](lhsSCollection, rhsSCollection, diffy, avroKeyFn(keys), ignoreNan)
      case "parquet" =>
        if (rowRestriction.isDefined) {
          throw new IllegalArgumentException("rowRestriction cannot be passed for Parquet inputs")
        }
        val compatSchema = ParquetIO.getCompatibleSchemaForFiles(lhs, rhs)
        val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)(
          avroGenericRecordCoder(compatSchema)
        )
        BigDiffy.diffParquet(sc, lhs, rhs, avroKeyFn(keys), diffy)
      case "bigquery" =>
        // TODO: handle schema evolution
        val bq = BigQuery.defaultInstance()
        val lSchema = bq.tables.schema(lhs)
        val rSchema = bq.tables.schema(rhs)
        val schema = mergeTableSchema(lSchema, rSchema)
        val diffy = new TableRowDiffy(schema, ignore, unordered, unorderedKeys)
        BigDiffy.diffTableRow(sc, lhs, rhs, rowRestriction, tableRowKeyFn(keys), diffy, ignoreNan)
      case m =>
        throw new IllegalArgumentException(s"input mode $m not supported")
    }
    saveStats(result, output, header, om)

    sc.run().waitUntilDone()
  }