in src/main/scala/com/spotify/bdrc/pipeline/FieldStatistics.scala [80:98]
def scalding(input: TypedPipe[User]): TypedPipe[UserStats] =
input.aggregate(aggregator)
// ## Scio
def scio(input: SCollection[User]): SCollection[UserStats] =
input.aggregate(aggregator)
// ## Spark
def spark(input: RDD[User]): UserStats = {
// Compute each field separately, potentially in-efficient if input is not cached
val s1 = input.map(_.age).stats()
val s2 = input.map(_.income).stats()
val s3 = input.map(_.score).stats()
UserStats(
age = Stats(s1.max, s1.min, s1.mean, s1.stdev),
income = Stats(s2.max, s2.min, s2.mean, s2.stdev),
score = Stats(s3.max, s3.min, s3.mean, s3.stdev)
)
}