private def computeGlobalAndFieldStats()

in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [264:308]


  private def computeGlobalAndFieldStats(
    deltas: DeltaSCollection,
    ignoreNan: Boolean
  ): SCollection[(GlobalStats, Iterable[FieldStats])] = {
    // Semigroup[DeltaType.Value] so it can be propagated during sum over Map
    implicit val deltaTypeSemigroup = new Semigroup[DeltaType.Value] {
      override def plus(l: DeltaType.Value, r: DeltaType.Value): DeltaType.Value = l
    }

    // Map value to be summed
    type MapVal = (Long, Option[(DeltaType.Value, Min[Double], Max[Double], Moments)])

    deltas
      .map { case (_, (ds, dt)) =>
        val m = mutable.Map.empty[String, MapVal]
        ds.foreach { d =>
          val optD = d.delta match {
            case UnknownDelta                             => None
            case TypedDelta(t, v) if ignoreNan && v.isNaN => None
            case TypedDelta(t, v) =>
              Some((t, Min(v), Max(v), Moments.aggregator.prepare(v)))
          }
          // Map of field -> (count, delta statistics)
          m(d.field) = (1L, optD)
        }
        // also sum global statistics
        val dtVec = dt match {
          case DiffType.SAME        => (1L, 1L, 0L, 0L, 0L)
          case DiffType.DIFFERENT   => (1L, 0L, 1L, 0L, 0L)
          case DiffType.MISSING_LHS => (1L, 0L, 0L, 1L, 0L)
          case DiffType.MISSING_RHS => (1L, 0L, 0L, 0L, 1L)
        }
        (dtVec, m.toMap)
      }
      .sum
      .map { case (dtVec, fieldMap) =>
        val globalKeyStats = GlobalStats.tupled(dtVec)
        val fieldStats = fieldMap.map { case (field, (count, optD)) =>
          val deltaStats = optD.map(extractDeltaStats _)
          val globalKeyStats = GlobalStats.tupled(dtVec)
          FieldStats(field, count, count.toDouble / globalKeyStats.numDiff, deltaStats)
        }
        (globalKeyStats, fieldStats)
      }
  }