def sparkNaive()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [129:142]


  def sparkNaive(left: RDD[LogEvent], right: RDD[UserMeta]): RDD[(String, Double)] = {
    import com.twitter.algebird.spark._
    import com.twitter.algebird.AveragedValue
    val lhs = left.map(e => (e.user, e.track))
    val rhs = right.map(u => (u.user, u.age.toDouble))
    // Join as (user, (track, age))
    lhs
      .join(rhs)
      // Drop user key to make track as new key in (track, age)
      .values
      .algebird
      // Aggregate average age per track
      .aggregateByKey(AveragedValue.aggregator)
  }