in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [56:73]
def scaldingHashJoin(
left: TypedPipe[LogEvent],
right: TypedPipe[UserMeta]
): TypedPipe[(String, Double)] = {
import com.twitter.algebird.AveragedValue
// Map out fields to avoid shuffing large objects
val lhs = left.map(e => (e.user, e.track))
// Force to disk to avoid repeating the same computation on each mapper on the LHS
val rhs = right.map(u => (u.user, u.age.toDouble)).forceToDisk
lhs
.hashJoin(rhs)
.values
.group
.aggregate(AveragedValue.aggregator)
.toTypedPipe
}