def withMaxSyncConcurrency()

in finagle-exp/src/main/scala/com/twitter/finagle/exp/fiber_scheduler/fiber/Group.scala [71:155]


  def withMaxSyncConcurrency(
    concurrency: Int,
    maxWaiters: Int,
    _trySchedule: ForkedFiber => Boolean,
    _schedule: ForkedFiber => Unit
  ): Group =
    new Group {

      // if >= 0, # of permits
      // if < 0, # of waiters
      private[this] val state = new AtomicInteger(concurrency)
      private[this] val waiters = new ConcurrentLinkedQueue[ForkedFiber]()

      override def trySchedule(fiber: ForkedFiber): Boolean = {
        def loop(): Boolean = {
          val s = state.get()
          if (s > 0 && state.compareAndSet(s, s - 1)) {
            // permit acquired successfully
            val ret = _trySchedule(fiber)
            if (!ret) {
              // roll back state change. It's important to call `suspend`
              // instead of just rolling back the state since a new waiter
              // could be added by another thread while this thread was
              // trying to schedule the fiber.
              suspend(fiber)
            }
            ret
          } else if (maxWaiters != Int.MaxValue && -s == maxWaiters) {
            // reached max waiters
            false
          } else if (s <= 0 && state.compareAndSet(s, s - 1)) {
            // failed to acquire permit, add waiter
            waiters.add(fiber)
            true
          } else {
            // CAS operation failed, retry
            loop()
          }
        }
        loop()
      }

      override def schedule(fiber: ForkedFiber): Unit = {
        def loop(): Unit = {
          val s = state.get()
          if (s > 0 && state.compareAndSet(s, s - 1)) {
            // permit acquired successfully
            _schedule(fiber)
          } else if (maxWaiters != Int.MaxValue && -s == maxWaiters) {
            // reached max waiters
            throw new RejectedExecutionException("fiber group reached its max waiters limit")
          } else if (s <= 0 && state.compareAndSet(s, s - 1)) {
            // failed to acquire permit, add waiter
            waiters.add(fiber)
          } else {
            // CAS operation failed, retry
            loop()
          }
        }
        loop()
      }

      override def activate(fiber: ForkedFiber): Unit = {
        // no need to update the state, just reuse
        // the previously acquired permit
        _schedule(fiber)
      }

      override def suspend(fiber: ForkedFiber): Unit = {
        val s = state.getAndIncrement()
        if (s < 0) {
          var w = waiters.poll()
          // accounts for the race condition in `schedule` and `trySchedule`
          // when the state is updated to indicate that a waiter is being added
          // but the waiter hasn't been added to the waiters queue yet. As in
          // `AsyncSemaphore`, it's safe to retry until it succeeds since the
          // thread that indicated that a waiter will be added can only proceed
          // to adding the waiter immediately after the state change.
          while (w == null) {
            w = waiters.poll()
          }
          _schedule(w)
        }
      }
    }