def saveStats[T]()

in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [436:541]


  def saveStats[T](
    bigDiffy: BigDiffy[T],
    output: String,
    withHeader: Boolean = false,
    outputMode: OutputMode = GCS
  ): Unit = {
    outputMode match {
      case GCS =>
        // Saving to GCS, either with or without header
        val keyStatsPath = s"$output/keys"
        val fieldStatsPath = s"$output/fields"
        val globalStatsPath = s"$output/global"

        if (withHeader) {
          bigDiffy.keyStats
            .map(_.toString)
            .saveAsTextFileWithHeader(keyStatsPath, Seq("key", "difftype").mkString("\t"))
          bigDiffy.fieldStats
            .map(_.toString)
            .saveAsTextFileWithHeader(
              fieldStatsPath,
              Seq(
                "field",
                "count",
                "fraction",
                "deltaType",
                "min",
                "max",
                "count",
                "mean",
                "variance",
                "stddev",
                "skewness",
                "kurtosis"
              ).mkString("\t")
            )
          bigDiffy.globalStats
            .map(_.toString)
            .saveAsTextFileWithHeader(
              globalStatsPath,
              Seq("numTotal", "numSame", "numDiff", "numMissingLhs", "numMissingRhs").mkString("\t")
            )
        } else {
          bigDiffy.keyStats.saveAsTextFile(keyStatsPath)
          bigDiffy.fieldStats.saveAsTextFile(fieldStatsPath)
          bigDiffy.globalStats.saveAsTextFile(globalStatsPath)
        }
      case BQ =>
        // Saving to BQ, header irrelevant
        bigDiffy.keyStats
          .map(stat =>
            KeyStatsBigQuery(
              stat.keys.toString,
              stat.diffType.toString,
              stat.delta.map { d =>
                val dv = d.delta match {
                  case TypedDelta(dt, v) =>
                    DeltaValueBigQuery(dt.toString, Option(v))
                  case _ =>
                    DeltaValueBigQuery("UNKNOWN", None)
                }
                DeltaBigQuery(
                  d.field,
                  d.left.map(_.toString).getOrElse("null"),
                  d.right.map(_.toString).getOrElse("null"),
                  dv
                )
              }
            )
          )
          .saveAsTypedBigQueryTable(Table.Spec(s"${output}_keys"))
        bigDiffy.fieldStats
          .map(stat =>
            FieldStatsBigQuery(
              stat.field,
              stat.count,
              stat.fraction,
              stat.deltaStats.map(ds =>
                DeltaStatsBigQuery(
                  ds.deltaType.toString,
                  ds.min,
                  ds.max,
                  ds.count,
                  ds.mean,
                  ds.variance,
                  ds.stddev,
                  ds.skewness,
                  ds.kurtosis
                )
              )
            )
          )
          .saveAsTypedBigQueryTable(Table.Spec(s"${output}_fields"))
        bigDiffy.globalStats
          .map(stat =>
            GlobalStatsBigQuery(
              stat.numTotal,
              stat.numSame,
              stat.numDiff,
              stat.numMissingLhs,
              stat.numMissingRhs
            )
          )
          .saveAsTypedBigQueryTable(Table.Spec(s"${output}_global"))
    }
  }