in src/main/scala/com/spotify/bdrc/pipeline/Sessions.scala [72:88]
def scio(input: SCollection[LogEvent]): SCollection[Session] = {
input
// Values in `groupBy` are sorted by timestamp
.timestampBy(e => new Instant(e.timestamp))
// No secondary sort in Scio, shuffle all items
.groupBy(_.user)
.flatMapValues {
_.iterator
// Generic version of `SessionIterator` from `scio-extra`
.timeSeries(_.timestamp)
.session(gapDuration)
}
// Map over each (user, session items)
.map { case (user, items) =>
Session(user, items.last.timestamp - items.head.timestamp, items.size)
}
}