def fillBatch[K]()

in src/main/scala/com/twitter/stitch/BatchPacking.scala [61:131]


  def fillBatch[K](
    remainingSortedKeyBuckets: Array[java.util.HashSet[K]],
    batch: mutable.ArrayBuffer[K],
    remainingSpaceInBatch: Int,
    batchPackingStrategy: BatchPackingStrategy
  ): mutable.ArrayBuffer[K] = {
    // find the largest bucket which is small enough to fit in the batch and remove it from the remainingSortedKeyBuckets
    // if no bucket is found, indicate whether there is > 1 bucket remaining
    val (keySetThatFits, moreThan1BucketRemaining) = {
      var i = 0
      var countOfBuckets = 0
      var found: java.util.HashSet[K] = null
      while (i < remainingSortedKeyBuckets.length) {
        val bucket = remainingSortedKeyBuckets(i)
        if (bucket != null) {
          countOfBuckets += 1
          if (bucket.size() <= remainingSpaceInBatch) {
            found = bucket
            remainingSortedKeyBuckets.update(i, null) // tombstone the bucket
            i = remainingSortedKeyBuckets.length // exit the loop
          }
        }
        i += 1
      }
      (found, countOfBuckets > 1)
    }

    keySetThatFits match {
      case null if remainingSpaceInBatch > 0 && moreThan1BucketRemaining =>
        // With no buckets small enough to fill the remaining space,
        // if the `batchPackingStrategy` is `SplitSmallBuckets` then the smallest bucket
        // is broken up and keys from the smallest bucket are used to fill the batch
        //
        // small optimization:
        // splitting the bucket to fill the remaining space is skipped
        // if there is only 1 bucket left and that bucket is too large to fill the remaining space
        // since we would have to make another batch anyway, we can keep the bucket intact
        batchPackingStrategy match {
          case SplitSmallBuckets =>
            // since used buckets are tombstoned with null, we need to find the last element which is non-null
            // since moreThan1BucketRemaining is true, this will always return a valid index
            val indexOfLastNonTombstoneBucket = remainingSortedKeyBuckets.lastIndexWhere(_ != null)
            val smallestKeySet = remainingSortedKeyBuckets(indexOfLastNonTombstoneBucket).iterator()
            // remove element from the smallest bucket and add it to the current batch
            var rsib = remainingSpaceInBatch
            while (rsib > 0) {
              batch += smallestKeySet.next()
              smallestKeySet.remove()
              rsib -= 1
            }
          // no need to update `remainingSortedKeyBuckets` since we were using the smallest element which is already in the right spot

          case _ =>
        }
        batch // return the created batch

      case null => batch // return the created batch

      case keySet =>
        // `keySet.size` <=  `remainingSpaceInBatch` so we dont need to check sizes here
        keySet.forEach(key => batch += key)
        // recursively fill the remaining
        val newRemaining = remainingSpaceInBatch - keySet.size
        if (newRemaining != 0) {
          // fillBatch is done recursively until there are no buckets small enough to fit in the remaining space
          fillBatch(remainingSortedKeyBuckets, batch, newRemaining, batchPackingStrategy)
        } else {
          batch // return the created batch
        }
    }
  }