in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [76:90]
def scioNaive(
left: SCollection[LogEvent],
right: SCollection[UserMeta]
): SCollection[(String, Double)] = {
import com.twitter.algebird.AveragedValue
val lhs = left.map(e => (e.user, e.track))
val rhs = right.map(u => (u.user, u.age.toDouble))
// Join as (user, (track, age))
lhs
.join(rhs)
// Drop user key to make track as new key in (track, age)
.values
// Aggregate average age per track
.aggregateByKey(AveragedValue.aggregator)
}