def scaldingExact()

in src/main/scala/com/spotify/bdrc/pipeline/TotalAndDistinctCount.scala [39:49]


  def scaldingExact(input: TypedPipe[String]): TypedPipe[(Long, Long)] = {
    input
      .map((_, 1L))
      .group
      .sum // (key, total count per key)
      .toTypedPipe
      .map(kv => (kv._1, (kv._2, 1L)))
      .group
      .sum // (key, (total count, distinct count))
      .values
  }