in finagle-exp/src/main/scala/com/twitter/finagle/exp/fiber_scheduler/FiberSchedulerOptimizer.scala [15:103]
def apply(
minThreads: Int,
maxThreads: Int,
startWorker: () => Promise[Unit],
stopWorker: () => Promise[Unit],
pool: ThreadPoolExecutor,
completions: LongAdder,
rejections: LongAdder,
workers: CopyOnWriteArrayList[Worker]
): Optimizer = {
import Optimizer._
import Config.Optimizer._
val cgroup = new Cgroup()
// Memory cliff to address potential issues with container OOM
// due to the use of off-heap memory for the thread stacks
val memoryCliff =
Limit
.ifReaches("memory_usage_percent", memoryCliffMaxUsagePercent) {
cgroup.memoryUsagePercent()
}.withMaxTries(memoryCliffMaxTries, memoryCliffMaxTriesExpiration)
.withCleanup {
// triggers inactive threads expiration
pool.allowCoreThreadTimeOut(true)
pool.allowCoreThreadTimeOut(false)
}
// Uses cgroup files to detect CPU throttling and limit the number of
// threads accordingly.
val cpuThrottlingCliff =
Limit
.ifIncreases("cpu_nr_throttled") {
cgroup.cpuNrThrottled()
}.withMaxTries(cpuThrottlingCliffMaxTries, cpuThrottlingCliffMaxTriesExpiration)
// Avoids reducing the number of threads if the scheduler is rejecting tasks.
val rejectionsValley =
Limit
.ifIncreases("rejections")(rejections.sum())
.withMaxTries(rejectionsValleyMaxTries, rejectionsValleyMaxTriesExpiration)
// Detects workers running blocking tasks and avoids reducing the number of
// threads if the percentage of blocked workers are above the threshold.
val blockedWorkersValley =
Limit
.ifReaches("bocked_workers", blockedWorkersValleyMaxPercent) {
var blocked = 0
workers.forEach { w =>
if (w.isBlocked)
blocked += 1
}
(blocked.toDouble / workers.size()) * 100
}.withMaxTries(blockedWorkersValleyMaxTries, blockedWorkersValleyMaxTriesExpiration)
// Detects if the scheduler is mostly idle. The optimizer doesn't make adapt
// decisions if the scheduler is is considered idle.
val isIdle = () => {
var active = 0D
workers.forEach { w =>
if (w.load() > 0)
active += 1
}
(active / workers.size) * 100 < idleThresholdPercent
}
Optimizer(
score = Score.delta(completions.sum()),
cliffLimit = cpuThrottlingCliff.andThen(memoryCliff),
valleyLimit = rejectionsValley.andThen(blockedWorkersValley),
cliffExpiration = cliffExpiration,
valleyExpiration = valleyExpiration,
max = maxThreads,
min = minThreads,
get = () => workers.size(),
up = startWorker,
down = stopWorker,
isIdle,
adaptPeriod = NextPowerOfTwo(optimizerAdaptPeriod),
wavePeriod = NextPowerOfTwo(optimizerWavePeriod),
cycleInterval = {
val p = cgroup.cpuPeriod
if (p.isZero) {
optimizerDefaultCycle
} else {
p
}
}
)
}