in src/main/scala/com/twitter/stitch/BatchPacking.scala [157:224]
def packBatches[K, B](
keyCount: Int,
keyBuckets: mutable.HashMap[B, java.util.HashSet[K]],
maxSize: Int,
batchPackingStrategy: BatchPackingStrategy
): mutable.ArrayBuffer[mutable.ArrayBuffer[K]] = {
// batches contains full batches, each with keys from a single bucket
val batches = new mutable.ArrayBuffer[mutable.ArrayBuffer[K]] {
// set the initial size to be the max number of batches possible to avoid resizing
batchPackingStrategy match {
case NeverSplitBuckets => keyCount % maxSize + keyBuckets.size
case SplitSmallBuckets => keyCount % maxSize + 1
}
}
// using an array so we can do inplace sorting later
// sized for the number of buckets, so we may have extra nulls at the end
// indexOfFirstNull is the index of the first null
val remainingKeyBuckets = new Array[java.util.HashSet[K]](keyBuckets.size)
var indexOfFirstNull = 0
// Buckets whose size >= maxSize are broken into batches of maxSize and the remaining key buckets
keyBuckets.foreach {
case (_, keySet) =>
val iter = keySet.iterator
var batch = new mutable.ArrayBuffer[K](maxSize)
var remaining = maxSize
while (keySet.size >= remaining) {
batch += iter.next()
iter.remove()
remaining -= 1
if (batch.size == maxSize) {
// complete batches are removed from buckets
batches += batch
batch = new mutable.ArrayBuffer[K](maxSize)
remaining = maxSize
}
}
if (keySet.size() > 0) {
// the remainder in the buckets gets added to batches later
remainingKeyBuckets.update(indexOfFirstNull, keySet)
indexOfFirstNull += 1
}
}
// batches now contains batches from all buckets which were >= maxSize
// remainingKeyBuckets now contains all the buckets and remainders which were < maxSize
// sort the remainingKeyBuckets in place to avoid extra allocations
// only sort the non-null elements [0 to indexOfFirstNull)
java.util.Arrays.sort(
remainingKeyBuckets,
0, // inclusive index
indexOfFirstNull, // exclusive index
setComparator.asInstanceOf[Comparator[java.util.HashSet[K]]]
)
// add non-complete buckets to batches
while (remainingKeyBuckets.exists(_ != null)) {
batches += fillBatch(
remainingKeyBuckets,
new mutable.ArrayBuffer[K](maxSize),
maxSize,
batchPackingStrategy)
}
batches
}