in scalding-base/src/main/scala/com/twitter/scalding/CumulativeSum.scala [19:96]
implicit def toCumulativeSum[K, U, V](pipe: TypedPipe[(K, (U, V))]): CumulativeSumExtension[K, U, V] =
new CumulativeSumExtension(pipe)
class CumulativeSumExtension[K, U, V](val pipe: TypedPipe[(K, (U, V))]) {
/** Takes a sortable field and a monoid and returns the cumulative sum of that monoid * */
def cumulativeSum(implicit
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]
): SortedGrouped[K, (U, V)] =
pipe.group
.sortBy { case (u, _) => u }
.scanLeft(Nil: List[(U, V)]) { case (acc, (u, v)) =>
acc match {
case List((previousU, previousSum)) => List((u, sg.plus(previousSum, v)))
case _ => List((u, v))
}
}
.flattenValues
/**
* An optimization of cumulativeSum for cases when a particular key has many entries. Requires a sortable
* partitioning of U. Accomplishes the optimization by not requiring all the entries for a single key to
* go through a single scan. Instead requires the sums of the partitions for a single key to go through a
* single scan.
*/
def cumulativeSum[S](partition: U => S)(implicit
ordS: Ordering[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]
): TypedPipe[(K, (U, V))] = {
val sumPerS = pipe
.map { case (k, (u, v)) => (k, partition(u)) -> v }
.sumByKey
.map { case ((k, s), v) => (k, (s, v)) }
.group
.sortBy { case (s, v) => s }
.scanLeft(None: Option[(Option[V], V, S)]) { case (acc, (s, v)) =>
acc match {
case Some((previousPreviousSum, previousSum, previousS)) => {
Some((Some(previousSum), sg.plus(v, previousSum), s))
}
case _ => Some((None, v, s))
}
}
.flatMap { case (k, maybeAcc) =>
for {
acc <- maybeAcc
previousSum <- acc._1
} yield { (k, acc._3) -> (None, previousSum) }
}
val summands = pipe
.map { case (k, (u, v)) =>
(k, partition(u)) -> (Some(u), v)
} ++ sumPerS
summands.group
.sortBy { case (u, _) => u }
.scanLeft(None: Option[(Option[U], V)]) { case (acc, (maybeU, v)) =>
acc match {
case Some((_, previousSum)) => Some((maybeU, sg.plus(v, previousSum)))
case _ => Some((maybeU, v))
}
}
.flatMap { case ((k, s), acc) =>
for {
uv <- acc
u <- uv._1
} yield {
(k, (u, uv._2))
}
}
}
}