def scaldingNaive()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [33:52]


  def scaldingNaive(
    left: TypedPipe[LogEvent],
    right: TypedPipe[UserMeta]
  ): TypedPipe[(String, Double)] = {
    import com.twitter.algebird.AveragedValue
    left
      .groupBy(_.user)
      // Join as (user, (LogEvent, UserMeta))
      .join(right.groupBy(_.user))
      // Drop user key
      .values
      // Map into (track, age)
      .map { case (logEvent, userMeta) =>
        (logEvent.track, userMeta.age.toDouble)
      }
      .group
      // Aggregate average age per track
      .aggregate(AveragedValue.aggregator)
      .toTypedPipe
  }