def sparkBroadcast()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [145:161]


  def sparkBroadcast(left: RDD[LogEvent], right: RDD[UserMeta]): RDD[(String, Double)] = {
    import com.twitter.algebird.spark._
    import com.twitter.algebird.AveragedValue

    // Retrieve `SparkContext` for creating broadcast variable
    val sc = left.context

    // Collect RHS to driver memory and broadcast back to workers
    val map = right.map(u => (u.user, u.age.toDouble)).collectAsMap()
    val b = sc.broadcast(map)

    left
      // In-memory lookup on each worker
      .map(e => (e.track, b.value.getOrElse(e.user, 0.0)))
      .algebird
      .aggregateByKey(AveragedValue.aggregator)
  }