in src/main/scala/com/spotify/bdrc/pipeline/TotalAndDistinctCount.scala [51:61]
def scaldingApproximate(input: TypedPipe[String]): TypedPipe[(Long, Long)] =
input.aggregate(aggregator)
def scioExact(input: SCollection[String]): SCollection[(Long, Long)] = {
input
.map((_, 1L))
.sumByKey // (key, total count per key)
.map(kv => (kv._1, (kv._2, 1L)))
.sumByKey // (key, (total count, distinct count))
.values
}