override def run[T2 <: T, V]()

in src/main/scala/com/twitter/stitch/Arrow.scala [1375:1529]


    override def run[T2 <: T, V](
      ts: ArrayBuffer[Try[T2]],
      ls: ArrayBuffer[Locals],
      tail: Arrow[U, V]
    ): Stitch[ArrayBuffer[Try[V]]] = {
      val numInputs = ts.size
      // pairwise with `choices`, indicates how many inputs matched that choice
      val inputCounts = new Array[Int](numChoices)
      // how many different choices were matched across all input values
      var uniqueArrows = 0
      // pairwise with `ts`, indicates the index of the choice that the input matched against.
      // -1 indicates the input is a `Throw`.
      val which = new Array[Int](ts.size)
      var hasThrows = false
      // index of the last choice that was matched.  only used when there is a single
      // choice used for all inputs.
      var lastMatch = -1
      var i = 0

      // in the first pass over the inputs, find the matching choice, recording the match
      // in `which`.  we then increment the input count for that choice.  if no choice is matched,
      // then the input is converted to a `Throw` with a `MatchError`.
      while (i < numInputs) {
        ts(i) match {
          case Return(x) =>
            var found = false
            var j = 0

            while (j < numChoices && !found) {
              val (cond, _) = choices(j)
              if (cond(x)) {
                found = true
                if (inputCounts(j) == 0) uniqueArrows += 1
                inputCounts(j) += 1
                which(i) = j
                lastMatch = j
              }

              j += 1
            }

            if (!found) {
              ts(i) = Throw(new MatchError(s"choose $x"))
              which(i) = -1
            }

          case _ =>
            hasThrows = true
            which(i) = -1
        }

        i += 1
      }

      uniqueArrows match {
        case 0 =>
          // all failures, nothing to dispatch
          tail.run(ts.asInstanceOf[ArrayBuffer[Try[U]]], ls)

        case 1 if !hasThrows =>
          // with only a single choosen arrow and Throws to filter out, we can just forward the
          // entire input to `arrow`
          val (_, arrow) = choices(lastMatch)
          arrow.run(ts, ls, tail)

        case _ =>
          // multiple arrows were chosen, so we need to physically partition the inputs to pass
          // to each of the chosen arrows.
          val partitionedInputs = new ArrayBuffer[ArrayBuffer[Try[T2]]](numChoices)
          val partitionedLocals = new ArrayBuffer[ArrayBuffer[Locals]](numChoices)
          var i = 0

          // first, allocate correctly sized input buffers for each arrow.  if a choice has no
          // matched inputs, we reuse an empty buffer.
          while (i < numChoices) {
            inputCounts(i) match {
              case _ if isConstant(i) =>
                partitionedInputs += null
                partitionedLocals += null
              case 0 =>
                partitionedInputs += emptyBuffer.asInstanceOf[ArrayBuffer[Try[T2]]]
                partitionedLocals += emptyBuffer.asInstanceOf[ArrayBuffer[Locals]]
              case size =>
                partitionedInputs += new ArrayBuffer[Try[T2]](size)
                partitionedLocals += new ArrayBuffer[Locals](size)
            }

            i += 1
          }

          // next, iterate over the inputs and place in the correct partitioned input buffer
          i = 0
          while (i < numInputs) {
            which(i) match {
              case -1 =>
              case idx if isConstant(idx) =>
              case idx =>
                partitionedInputs(idx) += ts(i)
                partitionedLocals(idx) += ls(i)
            }
            i += 1
          }

          // next, iterate over the partitions, creating a `Stitch` for each sub-arrow
          // invocation.
          val stitches = new ArrayBuffer[Stitch[Any]](numChoices)
          i = 0
          while (i < numChoices) {
            stitches += (
              inputCounts(i) match {
                case _ if isConstant(i) => Stitch.value(null)
                case 0 => emptyBufferStitch
                case 1 =>
                  val (_, arrow) = choices(i)
                  arrow.run(partitionedInputs(i)(0), partitionedLocals(i)(0)).liftToTry
                case _ =>
                  val (_, arrow) = choices(i)
                  arrow.run(partitionedInputs(i), partitionedLocals(i))
              }
            )
            i += 1
          }

          // collect the stitches and merge the results back together into a single sequence
          Stitch.collectNoCopy(stitches).flatMap { resultsSeq =>
            // resultsSeq(j) is null if the jth arrow is constant
            val result = ts.asInstanceOf[ArrayBuffer[Try[U]]]
            val iters = stitches.asInstanceOf[ArrayBuffer[Iterator[Try[U]]]]
            var i = 0

            while (i < numChoices) {
              inputCounts(i) match {
                case _ if isConstant(i) => null
                case 0 | 1 => null
                case _ => iters(i) = resultsSeq(i).asInstanceOf[ArrayBuffer[Try[U]]].iterator
              }
              i += 1
            }

            i = 0
            while (i < numInputs) {
              which(i) match {
                case -1 =>
                case idx if isConstant(idx) => result(i) = constants(idx)
                case idx if inputCounts(idx) == 1 =>
                  result(i) = resultsSeq(idx).asInstanceOf[Try[U]]
                case idx => result(i) = iters(idx).next
              }
              i += 1
            }

            tail.run(result, ls)
          }
      }
    }