def scaldingApproxWithAlgebird()

in src/main/scala/com/spotify/bdrc/pipeline/CountDistinctItems.scala [43:52]


  def scaldingApproxWithAlgebird(input: TypedPipe[Rating]): TypedPipe[Double] = {
    import com.twitter.algebird.HyperLogLogAggregator
    val aggregator = HyperLogLogAggregator.sizeAggregator(bits = 12)
    input
      // `HyperLogLog` expects bytes input
      .map(_.item.getBytes(Charsets.UTF_8))
      // Aggregate globally into a `Double`
      .aggregate(aggregator)
      .toTypedPipe
  }