def scio()

in src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala [44:53]


  def scio(input: SCollection[Rating]): SCollection[Rating] = {
    input
      .keyBy(_.user)
      // Compute top K per key
      .topByKey(topK)(Ordering.by(_.score))
      // Drop user key
      .values
      // Flatten result `Iterable[Rating]`
      .flatten
  }