def sparkWithAlgebird()

in src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala [67:78]


  def sparkWithAlgebird(input: RDD[Rating]): RDD[Rating] = {
    import com.twitter.algebird.Aggregator.sortedReverseTake
    import com.twitter.algebird.spark._
    val aggregator = sortedReverseTake[Rating](topK)(Ordering.by(_.score))
    input
      .keyBy(_.user)
      .algebird
      // Aggregate per key into a `Seq[Rating]`
      .aggregateByKey(aggregator)
      // Flatten result `Seq[Rating]`
      .flatMap(_._2)
  }