def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala [32:41]


  def scalding(input: TypedPipe[Rating]): TypedPipe[Rating] = {
    input
      .groupBy(_.user)
      // Take top K per group with a priority queue
      .sortedReverseTake(topK)(Ordering.by(_.score))
      // Drop user key
      .values
      // Flatten result `Seq[Rating]`
      .flatten
  }