in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/util/SamplerSCollectionFunctions.scala [98:134]
private[samplers] def buildUniformDiffs[T: ClassTag: Coder, U: ClassTag: Coder](
s: SCollection[T],
sampled: SCollection[(U, T)],
keyFn: T => U,
prob: Double,
popPerKey: SideInput[Double],
exact: Boolean = false
): SCollection[(Double, Map[U, Double])] = {
sampled.keys
.map(k => (1L, Map[U, Long](k -> 1L)))
.sum
.withSideInputs(popPerKey)
.map { case (res, sic) =>
val pop = sic(popPerKey)
val (totalCount, keyCounts) = res
val totalDiff = ((pop * keyCounts.size) - totalCount) / (pop * keyCounts.size)
val keyDiffs =
keyCounts.keySet.map(k => k -> (pop - keyCounts.getOrElse(k, 0L)) / pop).toMap
if (exact) {
if (totalDiff > errorTolerance) {
throw new Exception(
s"Total elements sampled off by ${totalDiff * 100}% (> ${errorTolerance * 100}%)"
)
}
keyDiffs.foreach { case (k, diff) =>
if (diff > errorTolerance) {
throw new Exception(
s"Elements for key $k sample off by ${diff * 100}% (> ${errorTolerance * 100}%)"
)
}
}
}
(totalDiff, keyDiffs)
}
.toSCollection
}