in src/main/scala/com/twitter/stitch/Arrow.scala [1375:1529]
override def run[T2 <: T, V](
ts: ArrayBuffer[Try[T2]],
ls: ArrayBuffer[Locals],
tail: Arrow[U, V]
): Stitch[ArrayBuffer[Try[V]]] = {
val numInputs = ts.size
// pairwise with `choices`, indicates how many inputs matched that choice
val inputCounts = new Array[Int](numChoices)
// how many different choices were matched across all input values
var uniqueArrows = 0
// pairwise with `ts`, indicates the index of the choice that the input matched against.
// -1 indicates the input is a `Throw`.
val which = new Array[Int](ts.size)
var hasThrows = false
// index of the last choice that was matched. only used when there is a single
// choice used for all inputs.
var lastMatch = -1
var i = 0
// in the first pass over the inputs, find the matching choice, recording the match
// in `which`. we then increment the input count for that choice. if no choice is matched,
// then the input is converted to a `Throw` with a `MatchError`.
while (i < numInputs) {
ts(i) match {
case Return(x) =>
var found = false
var j = 0
while (j < numChoices && !found) {
val (cond, _) = choices(j)
if (cond(x)) {
found = true
if (inputCounts(j) == 0) uniqueArrows += 1
inputCounts(j) += 1
which(i) = j
lastMatch = j
}
j += 1
}
if (!found) {
ts(i) = Throw(new MatchError(s"choose $x"))
which(i) = -1
}
case _ =>
hasThrows = true
which(i) = -1
}
i += 1
}
uniqueArrows match {
case 0 =>
// all failures, nothing to dispatch
tail.run(ts.asInstanceOf[ArrayBuffer[Try[U]]], ls)
case 1 if !hasThrows =>
// with only a single choosen arrow and Throws to filter out, we can just forward the
// entire input to `arrow`
val (_, arrow) = choices(lastMatch)
arrow.run(ts, ls, tail)
case _ =>
// multiple arrows were chosen, so we need to physically partition the inputs to pass
// to each of the chosen arrows.
val partitionedInputs = new ArrayBuffer[ArrayBuffer[Try[T2]]](numChoices)
val partitionedLocals = new ArrayBuffer[ArrayBuffer[Locals]](numChoices)
var i = 0
// first, allocate correctly sized input buffers for each arrow. if a choice has no
// matched inputs, we reuse an empty buffer.
while (i < numChoices) {
inputCounts(i) match {
case _ if isConstant(i) =>
partitionedInputs += null
partitionedLocals += null
case 0 =>
partitionedInputs += emptyBuffer.asInstanceOf[ArrayBuffer[Try[T2]]]
partitionedLocals += emptyBuffer.asInstanceOf[ArrayBuffer[Locals]]
case size =>
partitionedInputs += new ArrayBuffer[Try[T2]](size)
partitionedLocals += new ArrayBuffer[Locals](size)
}
i += 1
}
// next, iterate over the inputs and place in the correct partitioned input buffer
i = 0
while (i < numInputs) {
which(i) match {
case -1 =>
case idx if isConstant(idx) =>
case idx =>
partitionedInputs(idx) += ts(i)
partitionedLocals(idx) += ls(i)
}
i += 1
}
// next, iterate over the partitions, creating a `Stitch` for each sub-arrow
// invocation.
val stitches = new ArrayBuffer[Stitch[Any]](numChoices)
i = 0
while (i < numChoices) {
stitches += (
inputCounts(i) match {
case _ if isConstant(i) => Stitch.value(null)
case 0 => emptyBufferStitch
case 1 =>
val (_, arrow) = choices(i)
arrow.run(partitionedInputs(i)(0), partitionedLocals(i)(0)).liftToTry
case _ =>
val (_, arrow) = choices(i)
arrow.run(partitionedInputs(i), partitionedLocals(i))
}
)
i += 1
}
// collect the stitches and merge the results back together into a single sequence
Stitch.collectNoCopy(stitches).flatMap { resultsSeq =>
// resultsSeq(j) is null if the jth arrow is constant
val result = ts.asInstanceOf[ArrayBuffer[Try[U]]]
val iters = stitches.asInstanceOf[ArrayBuffer[Iterator[Try[U]]]]
var i = 0
while (i < numChoices) {
inputCounts(i) match {
case _ if isConstant(i) => null
case 0 | 1 => null
case _ => iters(i) = resultsSeq(i).asInstanceOf[ArrayBuffer[Try[U]]].iterator
}
i += 1
}
i = 0
while (i < numInputs) {
which(i) match {
case -1 =>
case idx if isConstant(idx) => result(i) = constants(idx)
case idx if inputCounts(idx) == 1 =>
result(i) = resultsSeq(idx).asInstanceOf[Try[U]]
case idx => result(i) = iters(idx).next
}
i += 1
}
tail.run(result, ls)
}
}
}