def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/CountDistinctItems.scala [31:40]


  def scalding(input: TypedPipe[Rating]): TypedPipe[Long] = {
    input
      .map(_.item)
      // Remove duplicates, requires a shuffle
      .distinct
      .map(_ => 1L)
      // Sum with an implicit `Semigroup[Long]`
      .sum
      .toTypedPipe
  }