in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [264:308]
private def computeGlobalAndFieldStats(
deltas: DeltaSCollection,
ignoreNan: Boolean
): SCollection[(GlobalStats, Iterable[FieldStats])] = {
// Semigroup[DeltaType.Value] so it can be propagated during sum over Map
implicit val deltaTypeSemigroup = new Semigroup[DeltaType.Value] {
override def plus(l: DeltaType.Value, r: DeltaType.Value): DeltaType.Value = l
}
// Map value to be summed
type MapVal = (Long, Option[(DeltaType.Value, Min[Double], Max[Double], Moments)])
deltas
.map { case (_, (ds, dt)) =>
val m = mutable.Map.empty[String, MapVal]
ds.foreach { d =>
val optD = d.delta match {
case UnknownDelta => None
case TypedDelta(t, v) if ignoreNan && v.isNaN => None
case TypedDelta(t, v) =>
Some((t, Min(v), Max(v), Moments.aggregator.prepare(v)))
}
// Map of field -> (count, delta statistics)
m(d.field) = (1L, optD)
}
// also sum global statistics
val dtVec = dt match {
case DiffType.SAME => (1L, 1L, 0L, 0L, 0L)
case DiffType.DIFFERENT => (1L, 0L, 1L, 0L, 0L)
case DiffType.MISSING_LHS => (1L, 0L, 0L, 1L, 0L)
case DiffType.MISSING_RHS => (1L, 0L, 0L, 0L, 1L)
}
(dtVec, m.toMap)
}
.sum
.map { case (dtVec, fieldMap) =>
val globalKeyStats = GlobalStats.tupled(dtVec)
val fieldStats = fieldMap.map { case (field, (count, optD)) =>
val deltaStats = optD.map(extractDeltaStats _)
val globalKeyStats = GlobalStats.tupled(dtVec)
FieldStats(field, count, count.toDouble / globalKeyStats.numDiff, deltaStats)
}
(globalKeyStats, fieldStats)
}
}