in util-core/concurrent-extra/src/main/scala/com/twitter/concurrent/Tx.scala [63:142]
def apply[T](msg: T): Tx[T] = if (msg == null) aborted else const(msg)
/**
* A constant `Tx` with the value of `Unit`.
*/
val Unit: Tx[Unit] = const(())
object AlreadyDone extends Exception("Tx is already done")
object AlreadyAckd extends Exception("Tx was already ackd")
object AlreadyNackd extends Exception("Tx was already nackd")
/**
* Create a two party transaction to exchange the value `msg`.
*
* @return a `Tx` object for each participant, (sender, receiver)
*/
def twoParty[T](msg: T): (Tx[Unit], Tx[T]) = {
sealed trait State
case object Idle extends State
case class Ackd(who: AnyRef, confirm: Boolean => Unit) extends State
case class Nackd(who: AnyRef) extends State
case object Done extends State
var state: State = Idle
val lock = new {}
class Party[U](msg: U) extends Tx[U] {
def ack(): Future[Result[U]] = lock.synchronized {
state match {
case Idle =>
val p = new Promise[Result[U]]
state = Ackd(
this,
{
case true => p.setValue(Commit(msg))
case false => p.setValue(Abort)
})
p
case Ackd(who, confirm) if who ne this =>
confirm(true)
state = Done
Future.value(Commit(msg))
case Nackd(who) if who ne this =>
state = Done
Future.value(Abort)
case Ackd(_, _) =>
throw AlreadyAckd
case Nackd(_) =>
throw AlreadyNackd
case Done =>
throw AlreadyDone
}
}
def nack(): Unit = {
lock.synchronized {
state match {
case Idle => state = Nackd(this)
case Nackd(who) if who ne this => state = Done
case Ackd(who, confirm) if who ne this =>
confirm(false)
state = Done
case Ackd(_, _) =>
throw AlreadyAckd
case Nackd(_) =>
throw AlreadyNackd
case Done =>
throw AlreadyDone
}
}
}
}
(new Party(()), new Party(msg))
}