def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/Sessions.scala [57:69]


  def scalding(input: TypedPipe[LogEvent]): TypedPipe[Session] = {
    input
      .groupBy(_.user)
      // `sortBy` uses Hadoop secondary sort to sort keys during shuffle
      .sortBy(_.timestamp)
      // Iterate over values lazily and group items into sessions
      .mapValueStream(new SessionIterator(_))
      .toTypedPipe
      // Map over each (user, session items)
      .map { case (user, items) =>
        Session(user, items.last.timestamp - items.head.timestamp, items.size)
      }
  }