def scio()

in src/main/scala/com/spotify/bdrc/pipeline/Sessions.scala [72:88]


  def scio(input: SCollection[LogEvent]): SCollection[Session] = {
    input
      // Values in `groupBy` are sorted by timestamp
      .timestampBy(e => new Instant(e.timestamp))
      // No secondary sort in Scio, shuffle all items
      .groupBy(_.user)
      .flatMapValues {
        _.iterator
          // Generic version of `SessionIterator` from `scio-extra`
          .timeSeries(_.timestamp)
          .session(gapDuration)
      }
      // Map over each (user, session items)
      .map { case (user, items) =>
        Session(user, items.last.timestamp - items.head.timestamp, items.size)
      }
  }