in src/main/scala/com/twitter/stitch/Arrow.scala [2129:2207]
override def run[T2 <: T, V](
ts: ArrayBuffer[Try[T2]],
ls: ArrayBuffer[Locals],
tail: Arrow[U, V]
): Stitch[ArrayBuffer[Try[V]]] = {
val len = ts.length
if (len == 0)
return a.run(ts, ls, tail)
val twitterLocals = ts.map(_.map(f))
val uniqueLocals = twitterLocals.toSet
/** Optimal case of all being the same, avoid the excess work of making Arrow compositions and calling into [[ApplyArrow]] */
if (uniqueLocals.size == 1) {
uniqueLocals.head match {
case Return(firstLocal) =>
/** All the same so we can do this without splitting the batch */
Stitch.let(letter)(firstLocal)(a.run(ts, ls)).flatMap { rs => tail.run(rs, ls) }
case t: Throw[_] =>
/**
* [[f]] failed with the same exception for all inputs so we continue execution in an error state without any scoping.
* If the input was already a [[Throw]], keep that it, otherwise it becomes the [[Throw]] from evaluating [[f]] to make the new [[TwitterLocal]] value
*/
a.run(TryBuffer.fill(ts)(t.asInstanceOf[Try[T2]]), ls, tail)
}
} else {
/**
* Non-optimal case, multiple values to scope the local to for different inputs.
* Build the [[Arrow]] that scopes each batch using [[Let]],
* then tuple the [[Arrow]] with the input and defer to [[ApplyArrow]] to do the execution from there
*/
/**
* Mapping from new [[TwitterLocal]] value to an underlying [[Arrow]] to run.
* Inputs are [[Try]]s because we want to run the underlying even if we fail here.
* [[andThenNoReassoc]] is used because these [[Arrow]]s are thrown away after each use.
*/
val twitterLocalsMap: scala.collection.immutable.Map[Try[L], Arrow[Try[T], U]] =
uniqueLocals.toIterator
.map[(Try[L], Arrow[Try[T], U])] {
case twitterLocalReturn: Return[L] =>
/**
* [[lowerFromTry]] to get back to the unwrapped input
* [[Let]] that will have the already computed [[TwitterLocal]]
* value be the same for all inputs efficiently applying to to them all.
*/
(
twitterLocalReturn,
Arrow.lowerFromTry.andThenNoReassoc(
Let[T, U, L](() => twitterLocalReturn.get(), letter, a))
)
case t: Throw[L] =>
/**
* if the input was already a [[Throw]], keep it,
* otherwise it becomes the exception from evaluating [[f]] to make the new [[TwitterLocal]] value,
* then call the underlying [[a]]
*/
(t.asInstanceOf[Try[L]], Arrow.const(t.asInstanceOf[Try[T]]).andThenNoReassoc(a))
}.toMap
val rs = ts.asInstanceOf[ArrayBuffer[Try[(Arrow[Try[T], U], Try[T])]]]
var i = 0
while (i < len) {
// make (Arrow, Try[T]) tuples to send to applyArrow
rs(i) = Return((twitterLocalsMap(twitterLocals(i)), ts(i)))
i += 1
}
/**
* We have already checked the simple case of all the inputs being the same and
* defer to [[ApplyArrow]] to handle the complex case of different values for different inputs
* since it will handle the complexity of executing it as efficiently as possible and remapping inputs
*/
Arrow.applyArrow[Try[T], U]().run(rs, ls, tail)
}
}