in src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala [32:41]
def scalding(input: TypedPipe[Rating]): TypedPipe[Rating] = {
input
.groupBy(_.user)
// Take top K per group with a priority queue
.sortedReverseTake(topK)(Ordering.by(_.score))
// Drop user key
.values
// Flatten result `Seq[Rating]`
.flatten
}