in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [94:111]
def scioSideInput(
left: SCollection[LogEvent],
right: SCollection[UserMeta]
): SCollection[(String, Double)] = {
import com.twitter.algebird.AveragedValue
// Convert RHS to a side input of `Map[String, Double]`
val rhs = right.map(u => (u.user, u.age.toDouble)).asMapSideInput
// Replicate RHS to each worker
left
.withSideInputs(rhs)
// Access side input via the context
.map { case (e, sideContext) => (e.track, sideContext(rhs).getOrElse(e.user, 0.0)) }
// Convert back to regular SCollection
.toSCollection
.aggregateByKey(AveragedValue.aggregator)
}