def scioNaive()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogAndMetadata.scala [76:90]


  def scioNaive(
    left: SCollection[LogEvent],
    right: SCollection[UserMeta]
  ): SCollection[(String, Double)] = {
    import com.twitter.algebird.AveragedValue
    val lhs = left.map(e => (e.user, e.track))
    val rhs = right.map(u => (u.user, u.age.toDouble))
    // Join as (user, (track, age))
    lhs
      .join(rhs)
      // Drop user key to make track as new key in (track, age)
      .values
      // Aggregate average age per track
      .aggregateByKey(AveragedValue.aggregator)
  }