in src/main/scala/com/spotify/bdrc/pipeline/JoinLogs.scala [69:87]
def scio(
playEvents: SCollection[LogEvent],
saveEvents: SCollection[LogEvent]
): SCollection[(String, String)] = {
// Map inputs to key-values and add event type information
val plays = playEvents.map(e => (e.user, ("play", e)))
val saves = saveEvents.map(e => (e.user, ("save", e)))
plays
.cogroup(saves)
// `Iterable`s of play and save events for the user
.flatMapValues { case (p, s) =>
(p ++ s).toList
.sortBy(_._2.timestamp)
// Neighboring pairs
.sliding(2)
.flatMap(detectPlaySaveSequence)
}
}