in src/main/scala/com/spotify/bdrc/pipeline/Sessions.scala [57:69]
def scalding(input: TypedPipe[LogEvent]): TypedPipe[Session] = {
input
.groupBy(_.user)
// `sortBy` uses Hadoop secondary sort to sort keys during shuffle
.sortBy(_.timestamp)
// Iterate over values lazily and group items into sessions
.mapValueStream(new SessionIterator(_))
.toTypedPipe
// Map over each (user, session items)
.map { case (user, items) =>
Session(user, items.last.timestamp - items.head.timestamp, items.size)
}
}