def scaldingHashJoin()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [56:73]


  def scaldingHashJoin(
    left: TypedPipe[LogEvent],
    right: TypedPipe[UserMeta]
  ): TypedPipe[(String, Double)] = {
    import com.twitter.algebird.AveragedValue

    // Map out fields to avoid shuffing large objects
    val lhs = left.map(e => (e.user, e.track))
    // Force to disk to avoid repeating the same computation on each mapper on the LHS
    val rhs = right.map(u => (u.user, u.age.toDouble)).forceToDisk

    lhs
      .hashJoin(rhs)
      .values
      .group
      .aggregate(AveragedValue.aggregator)
      .toTypedPipe
  }