def spark()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogs.scala [90:105]


  def spark(playEvents: RDD[LogEvent], saveEvents: RDD[LogEvent]): RDD[(String, String)] = {
    // Map inputs to key-values and add event type information
    val plays = playEvents.map(e => (e.user, ("play", e)))
    val saves = saveEvents.map(e => (e.user, ("save", e)))

    plays
      .cogroup(saves)
      .flatMapValues { case (p, s) =>
        // `Iterable`s of play and save events for the user
        (p ++ s).toList
          .sortBy(_._2.timestamp)
          // Neighboring pairs
          .sliding(2)
          .flatMap(detectPlaySaveSequence)
      }
  }