in src/main/scala/com/spotify/bdrc/pipeline/TopItemsPerUser.scala [44:53]
def scio(input: SCollection[Rating]): SCollection[Rating] = {
input
.keyBy(_.user)
// Compute top K per key
.topByKey(topK)(Ordering.by(_.score))
// Drop user key
.values
// Flatten result `Iterable[Rating]`
.flatten
}