in src/main/scala/com/spotify/bdrc/pipeline/TopItems.scala [32:46]
def scalding(input: TypedPipe[Rating]): TypedPipe[(String, Double)] = {
input
.map(x => (x.item, x.score))
.group
// Sum values with an implicit `Semigroup[Double]`
.sum
// Group all elements with a single key `Unit`
.groupAll
// Take top K with a priority queue
.sortedReverseTake(topK)(Ordering.by(_._2))
// Drop `Unit` key
.values
// Flatten result `Seq[(String, Double)]`
.flatten
}