def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/AverageScorePerItem.scala [31:41]


  def scalding(input: TypedPipe[Rating]): TypedPipe[(String, Double)] = {
    input
      .groupBy(_.user)
      // Map into (sum, count)
      .mapValues(x => (x.score, 1L))
      // Sum both per key with an implicit `Semigroup[(Double, Long)]`
      .sum
      // Map (sum, count) into average
      .mapValues(p => p._1 / p._2)
      .toTypedPipe
  }