def spark()

in src/main/scala/com/spotify/bdrc/pipeline/Sessions.scala [91:109]


  def spark(input: RDD[LogEvent]): RDD[Session] = {
    input
      // No secondary sort in Spark, shuffle all items
      .groupBy(_.user)
      .flatMapValues {
        _
          // Order of values after shuffle is not guaranteed
          .toList
          .sortBy(_.timestamp)
          .iterator
          // Generic version of `SessionIterator` from `scio-extra`
          .timeSeries(_.timestamp)
          .session(gapDuration)
      }
      // Map over each (user, session items)
      .map { case (user, items) =>
        Session(user, items.last.timestamp - items.head.timestamp, items.size)
      }
  }