in scio-core/src/main/scala/com/spotify/scio/values/DoubleSCollectionFunctions.scala [108:175]
def histogram(buckets: Array[Double], evenBuckets: Boolean = false): SCollection[Array[Long]] =
histogramImpl(self.context.parallelize(Seq(buckets)), evenBuckets)
private def histogramImpl(
buckets: SCollection[Array[Double]],
evenBuckets: Boolean
): SCollection[Array[Long]] = {
import com.spotify.scio.values.BucketFunctions._
// Map buckets into a side input of bucket function
val side = buckets.map { b =>
require(b.length >= 2, "buckets array must have at least two elements")
// Decide which bucket function to pass to histogramPartition. We decide here
// rather than having a general function so that the decision need only be made
// once rather than once per shard
val bucketParams: Either[(Double, Double, Int), Array[Double]] = if (evenBuckets) {
Left((b.head, b.last, b.length - 1))
} else {
Right(b)
}
bucketParams
}.asSingletonSideInput
val bucketSize = buckets.map(_.length - 1)
val hist = self
.withSideInputs(side)
.flatMap { (x, c) =>
// Map values to buckets
val bucketFunction = c(side) match {
case Left(p) => (fastBucketFunction _).tupled(p)
case Right(b) => basicBucketFunction(b) _
}
bucketFunction(x).iterator
}
.toSCollection
.countByValue // Count occurrences of each bucket
.cross(bucketSize) // Replicate bucket size
.map { case ((bin, count), size) =>
val b = Array.fill(size)(0L)
b(bin) = count
b
}
.reduce { (x, y) =>
val r = x.clone()
var i = 0
while (i < x.length) {
r(i) += y(i)
i += 1
}
r
}
// Workaround since hist may be empty
val bSide = bucketSize.asSingletonSideInput
val hSide = hist.asListSideInput
self.context
.parallelize(Seq(0))
.withSideInputs(bSide, hSide)
.map { (_, c) =>
val h = c(hSide)
if (h.isEmpty) {
Array.fill(c(bSide))(0L)
} else {
h.head
}
}
.toSCollection
}