in src/main/scala/com/twitter/stitch/BatchPacking.scala [61:131]
def fillBatch[K](
remainingSortedKeyBuckets: Array[java.util.HashSet[K]],
batch: mutable.ArrayBuffer[K],
remainingSpaceInBatch: Int,
batchPackingStrategy: BatchPackingStrategy
): mutable.ArrayBuffer[K] = {
// find the largest bucket which is small enough to fit in the batch and remove it from the remainingSortedKeyBuckets
// if no bucket is found, indicate whether there is > 1 bucket remaining
val (keySetThatFits, moreThan1BucketRemaining) = {
var i = 0
var countOfBuckets = 0
var found: java.util.HashSet[K] = null
while (i < remainingSortedKeyBuckets.length) {
val bucket = remainingSortedKeyBuckets(i)
if (bucket != null) {
countOfBuckets += 1
if (bucket.size() <= remainingSpaceInBatch) {
found = bucket
remainingSortedKeyBuckets.update(i, null) // tombstone the bucket
i = remainingSortedKeyBuckets.length // exit the loop
}
}
i += 1
}
(found, countOfBuckets > 1)
}
keySetThatFits match {
case null if remainingSpaceInBatch > 0 && moreThan1BucketRemaining =>
// With no buckets small enough to fill the remaining space,
// if the `batchPackingStrategy` is `SplitSmallBuckets` then the smallest bucket
// is broken up and keys from the smallest bucket are used to fill the batch
//
// small optimization:
// splitting the bucket to fill the remaining space is skipped
// if there is only 1 bucket left and that bucket is too large to fill the remaining space
// since we would have to make another batch anyway, we can keep the bucket intact
batchPackingStrategy match {
case SplitSmallBuckets =>
// since used buckets are tombstoned with null, we need to find the last element which is non-null
// since moreThan1BucketRemaining is true, this will always return a valid index
val indexOfLastNonTombstoneBucket = remainingSortedKeyBuckets.lastIndexWhere(_ != null)
val smallestKeySet = remainingSortedKeyBuckets(indexOfLastNonTombstoneBucket).iterator()
// remove element from the smallest bucket and add it to the current batch
var rsib = remainingSpaceInBatch
while (rsib > 0) {
batch += smallestKeySet.next()
smallestKeySet.remove()
rsib -= 1
}
// no need to update `remainingSortedKeyBuckets` since we were using the smallest element which is already in the right spot
case _ =>
}
batch // return the created batch
case null => batch // return the created batch
case keySet =>
// `keySet.size` <= `remainingSpaceInBatch` so we dont need to check sizes here
keySet.forEach(key => batch += key)
// recursively fill the remaining
val newRemaining = remainingSpaceInBatch - keySet.size
if (newRemaining != 0) {
// fillBatch is done recursively until there are no buckets small enough to fit in the remaining space
fillBatch(remainingSortedKeyBuckets, batch, newRemaining, batchPackingStrategy)
} else {
batch // return the created batch
}
}
}