def apply[T]()

in util-core/concurrent-extra/src/main/scala/com/twitter/concurrent/Tx.scala [63:142]


  def apply[T](msg: T): Tx[T] = if (msg == null) aborted else const(msg)

  /**
   * A constant `Tx` with the value of `Unit`.
   */
  val Unit: Tx[Unit] = const(())

  object AlreadyDone extends Exception("Tx is already done")
  object AlreadyAckd extends Exception("Tx was already ackd")
  object AlreadyNackd extends Exception("Tx was already nackd")

  /**
   * Create a two party transaction to exchange the value `msg`.
   *
   * @return a `Tx` object for each participant, (sender, receiver)
   */
  def twoParty[T](msg: T): (Tx[Unit], Tx[T]) = {
    sealed trait State
    case object Idle extends State
    case class Ackd(who: AnyRef, confirm: Boolean => Unit) extends State
    case class Nackd(who: AnyRef) extends State
    case object Done extends State

    var state: State = Idle
    val lock = new {}

    class Party[U](msg: U) extends Tx[U] {
      def ack(): Future[Result[U]] = lock.synchronized {
        state match {
          case Idle =>
            val p = new Promise[Result[U]]
            state = Ackd(
              this,
              {
                case true => p.setValue(Commit(msg))
                case false => p.setValue(Abort)
              })
            p

          case Ackd(who, confirm) if who ne this =>
            confirm(true)
            state = Done
            Future.value(Commit(msg))

          case Nackd(who) if who ne this =>
            state = Done
            Future.value(Abort)

          case Ackd(_, _) =>
            throw AlreadyAckd

          case Nackd(_) =>
            throw AlreadyNackd

          case Done =>
            throw AlreadyDone
        }
      }

      def nack(): Unit = {
        lock.synchronized {
          state match {
            case Idle => state = Nackd(this)
            case Nackd(who) if who ne this => state = Done
            case Ackd(who, confirm) if who ne this =>
              confirm(false)
              state = Done
            case Ackd(_, _) =>
              throw AlreadyAckd
            case Nackd(_) =>
              throw AlreadyNackd
            case Done =>
              throw AlreadyDone
          }
        }
      }
    }

    (new Party(()), new Party(msg))
  }