private[samplers] def buildUniformDiffs[T: ClassTag: Coder, U: ClassTag: Coder]()

in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/util/SamplerSCollectionFunctions.scala [98:134]


  private[samplers] def buildUniformDiffs[T: ClassTag: Coder, U: ClassTag: Coder](
    s: SCollection[T],
    sampled: SCollection[(U, T)],
    keyFn: T => U,
    prob: Double,
    popPerKey: SideInput[Double],
    exact: Boolean = false
  ): SCollection[(Double, Map[U, Double])] = {
    sampled.keys
      .map(k => (1L, Map[U, Long](k -> 1L)))
      .sum
      .withSideInputs(popPerKey)
      .map { case (res, sic) =>
        val pop = sic(popPerKey)
        val (totalCount, keyCounts) = res
        val totalDiff = ((pop * keyCounts.size) - totalCount) / (pop * keyCounts.size)
        val keyDiffs =
          keyCounts.keySet.map(k => k -> (pop - keyCounts.getOrElse(k, 0L)) / pop).toMap

        if (exact) {
          if (totalDiff > errorTolerance) {
            throw new Exception(
              s"Total elements sampled off by ${totalDiff * 100}% (> ${errorTolerance * 100}%)"
            )
          }
          keyDiffs.foreach { case (k, diff) =>
            if (diff > errorTolerance) {
              throw new Exception(
                s"Elements for key $k sample off by ${diff * 100}% (> ${errorTolerance * 100}%)"
              )
            }
          }
        }
        (totalDiff, keyDiffs)
      }
      .toSCollection
  }