in finagle-exp/src/main/scala/com/twitter/finagle/exp/fiber_scheduler/fiber/Group.scala [71:155]
def withMaxSyncConcurrency(
concurrency: Int,
maxWaiters: Int,
_trySchedule: ForkedFiber => Boolean,
_schedule: ForkedFiber => Unit
): Group =
new Group {
// if >= 0, # of permits
// if < 0, # of waiters
private[this] val state = new AtomicInteger(concurrency)
private[this] val waiters = new ConcurrentLinkedQueue[ForkedFiber]()
override def trySchedule(fiber: ForkedFiber): Boolean = {
def loop(): Boolean = {
val s = state.get()
if (s > 0 && state.compareAndSet(s, s - 1)) {
// permit acquired successfully
val ret = _trySchedule(fiber)
if (!ret) {
// roll back state change. It's important to call `suspend`
// instead of just rolling back the state since a new waiter
// could be added by another thread while this thread was
// trying to schedule the fiber.
suspend(fiber)
}
ret
} else if (maxWaiters != Int.MaxValue && -s == maxWaiters) {
// reached max waiters
false
} else if (s <= 0 && state.compareAndSet(s, s - 1)) {
// failed to acquire permit, add waiter
waiters.add(fiber)
true
} else {
// CAS operation failed, retry
loop()
}
}
loop()
}
override def schedule(fiber: ForkedFiber): Unit = {
def loop(): Unit = {
val s = state.get()
if (s > 0 && state.compareAndSet(s, s - 1)) {
// permit acquired successfully
_schedule(fiber)
} else if (maxWaiters != Int.MaxValue && -s == maxWaiters) {
// reached max waiters
throw new RejectedExecutionException("fiber group reached its max waiters limit")
} else if (s <= 0 && state.compareAndSet(s, s - 1)) {
// failed to acquire permit, add waiter
waiters.add(fiber)
} else {
// CAS operation failed, retry
loop()
}
}
loop()
}
override def activate(fiber: ForkedFiber): Unit = {
// no need to update the state, just reuse
// the previously acquired permit
_schedule(fiber)
}
override def suspend(fiber: ForkedFiber): Unit = {
val s = state.getAndIncrement()
if (s < 0) {
var w = waiters.poll()
// accounts for the race condition in `schedule` and `trySchedule`
// when the state is updated to indicate that a waiter is being added
// but the waiter hasn't been added to the waiters queue yet. As in
// `AsyncSemaphore`, it's safe to retry until it succeeds since the
// thread that indicated that a waiter will be added can only proceed
// to adding the waiter immediately after the state change.
while (w == null) {
w = waiters.poll()
}
_schedule(w)
}
}
}