in src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala [67:78]
def sparkWithAlgebird(input: RDD[Rating]): RDD[Rating] = {
import com.twitter.algebird.Aggregator.sortedReverseTake
import com.twitter.algebird.spark._
val aggregator = sortedReverseTake[Rating](topK)(Ordering.by(_.score))
input
.keyBy(_.user)
.algebird
// Aggregate per key into a `Seq[Rating]`
.aggregateByKey(aggregator)
// Flatten result `Seq[Rating]`
.flatMap(_._2)
}