def scioSideInput()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [94:111]


  def scioSideInput(
    left: SCollection[LogEvent],
    right: SCollection[UserMeta]
  ): SCollection[(String, Double)] = {
    import com.twitter.algebird.AveragedValue

    // Convert RHS to a side input of `Map[String, Double]`
    val rhs = right.map(u => (u.user, u.age.toDouble)).asMapSideInput

    // Replicate RHS to each worker
    left
      .withSideInputs(rhs)
      // Access side input via the context
      .map { case (e, sideContext) => (e.track, sideContext(rhs).getOrElse(e.user, 0.0)) }
      // Convert back to regular SCollection
      .toSCollection
      .aggregateByKey(AveragedValue.aggregator)
  }