private def computeDeltas[T: Coder]()

in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [222:262]


  private def computeDeltas[T: Coder](
    lhs: SCollection[T],
    rhs: SCollection[T],
    diffy: Diffy[T],
    keyFn: T => MultiKey
  ): DeltaSCollection = {
    // extract keys and prefix records with L/R sub-key
    val lKeyed = lhs.map(t => (keyFn(t), ("l", t)))
    val rKeyed = rhs.map(t => (keyFn(t), ("r", t)))

    val sc = lhs.context
    val accSame = ScioMetrics.counter[Long]("SAME")
    val accDiff = ScioMetrics.counter[Long]("DIFFERENT")
    val accMissingLhs = ScioMetrics.counter[Long]("MISSING_LHS")
    val accMissingRhs = ScioMetrics.counter[Long]("MISSING_RHS")

    (lKeyed ++ rKeyed).groupByKey
      .map { case (key, values) => // values is a list of tuples: "l" -> record or "r" -> record
        if (values.size > 2) {
          throw new RuntimeException(s"""More than two values found for key: $key.
               | Your key must be unique in both SCollections""".stripMargin)
        }

        val valuesMap = values.toMap // L/R -> record
        if (valuesMap.size == 2) {
          val deltas: Seq[Delta] = diffy(valuesMap("l"), valuesMap("r"))
          val diffType = if (deltas.isEmpty) DiffType.SAME else DiffType.DIFFERENT
          (key, (deltas, diffType))
        } else {
          val diffType = if (valuesMap.contains("l")) DiffType.MISSING_RHS else DiffType.MISSING_LHS
          (key, (Nil, diffType))
        }
      }
      .tap {
        case (_, (_, DiffType.SAME))        => accSame.inc()
        case (_, (_, DiffType.DIFFERENT))   => accDiff.inc()
        case (_, (_, DiffType.MISSING_LHS)) => accMissingLhs.inc()
        case (_, (_, DiffType.MISSING_RHS)) => accMissingRhs.inc()
        case _                              =>
      }
  }