in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [33:52]
def scaldingNaive(
left: TypedPipe[LogEvent],
right: TypedPipe[UserMeta]
): TypedPipe[(String, Double)] = {
import com.twitter.algebird.AveragedValue
left
.groupBy(_.user)
// Join as (user, (LogEvent, UserMeta))
.join(right.groupBy(_.user))
// Drop user key
.values
// Map into (track, age)
.map { case (logEvent, userMeta) =>
(logEvent.track, userMeta.age.toDouble)
}
.group
// Aggregate average age per track
.aggregate(AveragedValue.aggregator)
.toTypedPipe
}