def scaldingApproximate()

in src/main/scala/com/spotify/bdrc/pipeline/TotalAndDistinctCount.scala [51:61]


  def scaldingApproximate(input: TypedPipe[String]): TypedPipe[(Long, Long)] =
    input.aggregate(aggregator)

  def scioExact(input: SCollection[String]): SCollection[(Long, Long)] = {
    input
      .map((_, 1L))
      .sumByKey // (key, total count per key)
      .map(kv => (kv._1, (kv._2, 1L)))
      .sumByKey // (key, (total count, distinct count))
      .values
  }