in src/main/scala/com/spotify/bdrc/pipeline/TotalAndDistinctCount.scala [63:76]
def scioApproximate(input: SCollection[String]): SCollection[(Long, Long)] =
input.aggregate(aggregator)
def sparkAlgebird(input: RDD[String]): RDD[(Long, Long)] = {
import com.twitter.algebird.spark._
input
.map((_, 1L))
.algebird
.sumByKey[String, Long] // (key, total count per key)
.map(kv => (kv._1, (kv._2, 1L)))
.algebird
.sumByKey[String, (Long, Long)] // (key, (total count, distinct count))
.values
}