def packBatches[K, B]()

in src/main/scala/com/twitter/stitch/BatchPacking.scala [157:224]


  def packBatches[K, B](
    keyCount: Int,
    keyBuckets: mutable.HashMap[B, java.util.HashSet[K]],
    maxSize: Int,
    batchPackingStrategy: BatchPackingStrategy
  ): mutable.ArrayBuffer[mutable.ArrayBuffer[K]] = {
    // batches contains full batches, each with keys from a single bucket
    val batches = new mutable.ArrayBuffer[mutable.ArrayBuffer[K]] {
      // set the initial size to be the max number of batches possible to avoid resizing
      batchPackingStrategy match {
        case NeverSplitBuckets => keyCount % maxSize + keyBuckets.size
        case SplitSmallBuckets => keyCount % maxSize + 1
      }
    }

    // using an array so we can do inplace sorting later
    // sized for the number of buckets, so we may have extra nulls at the end
    // indexOfFirstNull is the index of the first null
    val remainingKeyBuckets = new Array[java.util.HashSet[K]](keyBuckets.size)
    var indexOfFirstNull = 0

    // Buckets whose size >= maxSize are broken into batches of maxSize and the remaining key buckets
    keyBuckets.foreach {
      case (_, keySet) =>
        val iter = keySet.iterator
        var batch = new mutable.ArrayBuffer[K](maxSize)
        var remaining = maxSize
        while (keySet.size >= remaining) {
          batch += iter.next()
          iter.remove()
          remaining -= 1
          if (batch.size == maxSize) {
            // complete batches are removed from buckets
            batches += batch
            batch = new mutable.ArrayBuffer[K](maxSize)
            remaining = maxSize
          }
        }
        if (keySet.size() > 0) {
          // the remainder in the buckets gets added to batches later
          remainingKeyBuckets.update(indexOfFirstNull, keySet)
          indexOfFirstNull += 1
        }
    }

    // batches now contains batches from all buckets which were >= maxSize
    // remainingKeyBuckets now contains all the buckets and remainders which were < maxSize

    // sort the remainingKeyBuckets in place to avoid extra allocations
    // only sort the non-null elements [0 to indexOfFirstNull)
    java.util.Arrays.sort(
      remainingKeyBuckets,
      0, // inclusive index
      indexOfFirstNull, // exclusive index
      setComparator.asInstanceOf[Comparator[java.util.HashSet[K]]]
    )

    // add non-complete buckets to batches
    while (remainingKeyBuckets.exists(_ != null)) {
      batches += fillBatch(
        remainingKeyBuckets,
        new mutable.ArrayBuffer[K](maxSize),
        maxSize,
        batchPackingStrategy)
    }

    batches
  }