def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/TopItems.scala [32:46]


  def scalding(input: TypedPipe[Rating]): TypedPipe[(String, Double)] = {
    input
      .map(x => (x.item, x.score))
      .group
      // Sum values with an implicit `Semigroup[Double]`
      .sum
      // Group all elements with a single key `Unit`
      .groupAll
      // Take top K with a priority queue
      .sortedReverseTake(topK)(Ordering.by(_._2))
      // Drop `Unit` key
      .values
      // Flatten result `Seq[(String, Double)]`
      .flatten
  }