in src/main/scala/com/spotify/bdrc/pipeline/JoinLogs.scala [48:66]
def scalding(
playEvents: TypedPipe[LogEvent],
saveEvents: TypedPipe[LogEvent]
): TypedPipe[(String, String)] = {
// Map inputs to key-values and add event type information
val plays = playEvents.map(e => (e.user, ("play", e))).group
val saves = saveEvents.map(e => (e.user, ("save", e))).group
plays
.cogroup(saves) { (user, p, s) =>
// `Iterable`s of play and save events for the user
(p ++ s).toList
.sortBy(_._2.timestamp)
// Neighboring pairs
.sliding(2)
.flatMap(detectPlaySaveSequence)
}
.toTypedPipe
}