def scio()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogs.scala [69:87]


  def scio(
    playEvents: SCollection[LogEvent],
    saveEvents: SCollection[LogEvent]
  ): SCollection[(String, String)] = {
    // Map inputs to key-values and add event type information
    val plays = playEvents.map(e => (e.user, ("play", e)))
    val saves = saveEvents.map(e => (e.user, ("save", e)))

    plays
      .cogroup(saves)
      // `Iterable`s of play and save events for the user
      .flatMapValues { case (p, s) =>
        (p ++ s).toList
          .sortBy(_._2.timestamp)
          // Neighboring pairs
          .sliding(2)
          .flatMap(detectPlaySaveSequence)
      }
  }