in src/main/scala/com/spotify/bdrc/pipeline/TopItems.scala [49:62]
def scaldingWithAlgebird(input: TypedPipe[Rating]): TypedPipe[(String, Double)] = {
import com.twitter.algebird.Aggregator.sortedReverseTake
val aggregator = sortedReverseTake[(String, Double)](topK)(Ordering.by(_._2))
input
.map(x => (x.item, x.score))
.group
// Sum values with an implicit `Semigroup[Double]`
.sum
.toTypedPipe
// Aggregate globally into a single `Seq[(String, Double)]`
.aggregate(aggregator)
// Flatten result `Seq[(String, Double)]`
.flatten
}