def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/FieldStatistics.scala [80:98]


  def scalding(input: TypedPipe[User]): TypedPipe[UserStats] =
    input.aggregate(aggregator)

  // ## Scio
  def scio(input: SCollection[User]): SCollection[UserStats] =
    input.aggregate(aggregator)

  // ## Spark
  def spark(input: RDD[User]): UserStats = {
    // Compute each field separately, potentially in-efficient if input is not cached
    val s1 = input.map(_.age).stats()
    val s2 = input.map(_.income).stats()
    val s3 = input.map(_.score).stats()
    UserStats(
      age = Stats(s1.max, s1.min, s1.mean, s1.stdev),
      income = Stats(s2.max, s2.min, s2.mean, s2.stdev),
      score = Stats(s3.max, s3.min, s3.mean, s3.stdev)
    )
  }