def scio()

in src/main/scala/com/spotify/bdrc/pipeline/TopItems.scala [65:74]


  def scio(input: SCollection[Rating]): SCollection[(String, Double)] = {
    input
      .map(x => (x.item, x.score))
      // Sum values with an implicit `Semigroup[Double]`
      .sumByKey
      // Compute top K as an `Iterable[(String, Double)]`
      .top(topK)(Ordering.by(_._2))
      // Flatten result `Iterable[(String, Double)]`
      .flatten
  }