in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [222:262]
private def computeDeltas[T: Coder](
lhs: SCollection[T],
rhs: SCollection[T],
diffy: Diffy[T],
keyFn: T => MultiKey
): DeltaSCollection = {
// extract keys and prefix records with L/R sub-key
val lKeyed = lhs.map(t => (keyFn(t), ("l", t)))
val rKeyed = rhs.map(t => (keyFn(t), ("r", t)))
val sc = lhs.context
val accSame = ScioMetrics.counter[Long]("SAME")
val accDiff = ScioMetrics.counter[Long]("DIFFERENT")
val accMissingLhs = ScioMetrics.counter[Long]("MISSING_LHS")
val accMissingRhs = ScioMetrics.counter[Long]("MISSING_RHS")
(lKeyed ++ rKeyed).groupByKey
.map { case (key, values) => // values is a list of tuples: "l" -> record or "r" -> record
if (values.size > 2) {
throw new RuntimeException(s"""More than two values found for key: $key.
| Your key must be unique in both SCollections""".stripMargin)
}
val valuesMap = values.toMap // L/R -> record
if (valuesMap.size == 2) {
val deltas: Seq[Delta] = diffy(valuesMap("l"), valuesMap("r"))
val diffType = if (deltas.isEmpty) DiffType.SAME else DiffType.DIFFERENT
(key, (deltas, diffType))
} else {
val diffType = if (valuesMap.contains("l")) DiffType.MISSING_RHS else DiffType.MISSING_LHS
(key, (Nil, diffType))
}
}
.tap {
case (_, (_, DiffType.SAME)) => accSame.inc()
case (_, (_, DiffType.DIFFERENT)) => accDiff.inc()
case (_, (_, DiffType.MISSING_LHS)) => accMissingLhs.inc()
case (_, (_, DiffType.MISSING_RHS)) => accMissingRhs.inc()
case _ =>
}
}