in src/main/scala/com/spotify/bdrc/pipeline/Sessions.scala [91:109]
def spark(input: RDD[LogEvent]): RDD[Session] = {
input
// No secondary sort in Spark, shuffle all items
.groupBy(_.user)
.flatMapValues {
_
// Order of values after shuffle is not guaranteed
.toList
.sortBy(_.timestamp)
.iterator
// Generic version of `SessionIterator` from `scio-extra`
.timeSeries(_.timestamp)
.session(gapDuration)
}
// Map over each (user, session items)
.map { case (user, items) =>
Session(user, items.last.timestamp - items.head.timestamp, items.size)
}
}