def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/JoinLogs.scala [48:66]


  def scalding(
    playEvents: TypedPipe[LogEvent],
    saveEvents: TypedPipe[LogEvent]
  ): TypedPipe[(String, String)] = {
    // Map inputs to key-values and add event type information
    val plays = playEvents.map(e => (e.user, ("play", e))).group
    val saves = saveEvents.map(e => (e.user, ("save", e))).group

    plays
      .cogroup(saves) { (user, p, s) =>
        // `Iterable`s of play and save events for the user
        (p ++ s).toList
          .sortBy(_._2.timestamp)
          // Neighboring pairs
          .sliding(2)
          .flatMap(detectPlaySaveSequence)
      }
      .toTypedPipe
  }