def histogram()

in scio-core/src/main/scala/com/spotify/scio/values/DoubleSCollectionFunctions.scala [108:175]


  def histogram(buckets: Array[Double], evenBuckets: Boolean = false): SCollection[Array[Long]] =
    histogramImpl(self.context.parallelize(Seq(buckets)), evenBuckets)

  private def histogramImpl(
    buckets: SCollection[Array[Double]],
    evenBuckets: Boolean
  ): SCollection[Array[Long]] = {
    import com.spotify.scio.values.BucketFunctions._
    // Map buckets into a side input of bucket function
    val side = buckets.map { b =>
      require(b.length >= 2, "buckets array must have at least two elements")

      // Decide which bucket function to pass to histogramPartition. We decide here
      // rather than having a general function so that the decision need only be made
      // once rather than once per shard
      val bucketParams: Either[(Double, Double, Int), Array[Double]] = if (evenBuckets) {
        Left((b.head, b.last, b.length - 1))
      } else {
        Right(b)
      }
      bucketParams
    }.asSingletonSideInput

    val bucketSize = buckets.map(_.length - 1)
    val hist = self
      .withSideInputs(side)
      .flatMap { (x, c) =>
        // Map values to buckets
        val bucketFunction = c(side) match {
          case Left(p)  => (fastBucketFunction _).tupled(p)
          case Right(b) => basicBucketFunction(b) _
        }
        bucketFunction(x).iterator
      }
      .toSCollection
      .countByValue // Count occurrences of each bucket
      .cross(bucketSize) // Replicate bucket size
      .map { case ((bin, count), size) =>
        val b = Array.fill(size)(0L)
        b(bin) = count
        b
      }
      .reduce { (x, y) =>
        val r = x.clone()
        var i = 0
        while (i < x.length) {
          r(i) += y(i)
          i += 1
        }
        r
      }

    // Workaround since hist may be empty
    val bSide = bucketSize.asSingletonSideInput
    val hSide = hist.asListSideInput
    self.context
      .parallelize(Seq(0))
      .withSideInputs(bSide, hSide)
      .map { (_, c) =>
        val h = c(hSide)
        if (h.isEmpty) {
          Array.fill(c(bSide))(0L)
        } else {
          h.head
        }
      }
      .toSCollection
  }