in util-core/concurrent-extra/src/main/scala/com/twitter/concurrent/Offer.scala [229:285]
def prepare(): Future[Tx[T]] = {
// to avoid unnecessary allocations we do a bunch of manual looping and shuffling
val inputSize = evs.size
val prepd = new Array[Future[Tx[T]]](inputSize)
val iter = evs.iterator
var i = 0
while (i < inputSize) {
prepd(i) = iter.next().prepare()
i += 1
}
// We use match instead of foreach to reduce allocations.
random match {
case None =>
// Shuffle only if random is defined
case Some(r) =>
while (i > 1) { // i starts at evs.size
val nextPos = r.nextInt(i)
val tmp = prepd(i - 1)
prepd(i - 1) = prepd(nextPos)
prepd(nextPos) = tmp
i -= 1
}
}
i = 0
var foundPos = -1
while (foundPos < 0 && i < prepd.length) {
val winner = prepd(i)
if (winner.isDefined) foundPos = i
i += 1
}
def updateLosers(winPos: Int, prepd: Array[Future[Tx[T]]]): Future[Tx[T]] = {
val winner = prepd(winPos)
var j = 0
while (j < prepd.length) {
val loser = prepd(j)
if (loser ne winner) {
loser.respond {
case Return(tx) =>
tx.nack()
case _ =>
}
loser.raise(LostSynchronization)
}
j += 1
}
winner
}
if (foundPos >= 0) {
updateLosers(foundPos, prepd)
} else {
Future.selectIndex(ArraySeq.unsafeWrapArray(prepd)) flatMap { winPos =>
updateLosers(winPos, prepd)
}
}
}