in src/main/scala/com/spotify/bdrc/pipeline/JoinLogs.scala [90:105]
def spark(playEvents: RDD[LogEvent], saveEvents: RDD[LogEvent]): RDD[(String, String)] = {
// Map inputs to key-values and add event type information
val plays = playEvents.map(e => (e.user, ("play", e)))
val saves = saveEvents.map(e => (e.user, ("save", e)))
plays
.cogroup(saves)
.flatMapValues { case (p, s) =>
// `Iterable`s of play and save events for the user
(p ++ s).toList
.sortBy(_._2.timestamp)
// Neighboring pairs
.sliding(2)
.flatMap(detectPlaySaveSequence)
}
}