scheduler/server/load_based_sched_alg.go (582 lines of code) (raw):
package server
import (
"fmt"
"math"
"regexp"
"sort"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/scheduler/domain"
)
const (
under int = iota
over
)
// defaults for the LoadBasedScheduler algorithm: only one class and all jobs map to that class
var (
DefaultLoadBasedSchedulerClassPercents = map[string]int32{
"land": 40,
"diff": 25,
"sandbox": 10,
"regression": 17,
"ktf": 3,
"coverage": 2,
"tryout": 2,
"unknown": 1,
}
DefaultRequestorToClassMap = map[string]string{
"land.*": "land",
"diff.*": "diff",
"sandbox.*": "sandbox",
"regression.*": "regression",
"CI.*": "regression",
"jenkins.*": "ktf",
"ktf.*": "ktf",
"coverage.*": "coverage",
"tryout.*": "tryout",
}
DefaultMinRebalanceTime = time.Duration(4 * time.Minute)
MaxTaskDuration = time.Duration(4 * time.Hour)
)
type LoadBasedAlgConfig struct {
classLoadPercents map[string]int
classLoadPercentsMu sync.RWMutex
requestorReToClassMap map[string]string
requestorReToClassMapMU sync.RWMutex
rebalanceThreshold int
rebalanceThresholdMu sync.RWMutex
rebalanceMinDuration time.Duration
rebalanceMinDurationMu sync.RWMutex
classByDescLoadPct []string
stat stats.StatsReceiver
}
// LoadBasedAlg the scheduling algorithm computes the list of tasks to start and stop for the
// current iteration of the scheduler loop.
//
// The algorithm uses the classLoadPercents, number of tasks currently running for each class,
// and number of available workers when computing the tasks to start/stop.
//
// the algorithm has 3 main phases: rebalancing, entitlement allocation, loaning allocation.
//
// - the rebalancing phase is only triggered when the number of tasks running for each class is very
// different than the class load percents. When rebalancing is run, the rebalancing computation will
// select the set of tasks that need to be stopped to bring classes that are over their target load
// percents back to the target load percents, and the tasks that can be started to replace the stopped tasks.
// Note: there may still be idle workers after the rebalance tasks are started/stopped. The next
// scheduling iteration will find tasks for these workers.
//
// the entitlement part of the computation identifies the number of tasks to start to bring the number
// of running tasks for each class closer to its entitlement as defined in the classLoadPercents.
//
// the loan part of the computation identifies the number of tasks over a class's entitlement that can
// be started to use up unused workers due to other classes not using their entitlement.
//
// See README.md for more details
//
type LoadBasedAlg struct {
config *LoadBasedAlgConfig
jobClasses map[string]*jobClass
totalUnusedEntitlement int
exceededRebalanceThresholdStart time.Time
// local copy of load pcts and requestor map to use during assignment computation
// to insulate the computation from external changes to the configuration
classLoadPercents map[string]int
requestorReToClassMap map[string]string
classByDescLoadPct []string
tasksByJobClassAndStartTimeSec map[taskClassAndStartKey]taskStateByJobIDTaskID
nopAllocationsCnt int // count the number of times, we've run LBS without allocating any nodes to tasks
}
// NewLoadBasedAlg allocate a new LoadBaseSchedAlg object.
func NewLoadBasedAlg(config *LoadBasedAlgConfig, tasksByJobClassAndStartTimeSec map[taskClassAndStartKey]taskStateByJobIDTaskID) *LoadBasedAlg {
lbs := &LoadBasedAlg{
config: config,
jobClasses: map[string]*jobClass{},
exceededRebalanceThresholdStart: time.Time{},
tasksByJobClassAndStartTimeSec: tasksByJobClassAndStartTimeSec,
}
return lbs
}
// jobWaitingTasks waiting task ids (in the order they should be started) for a job
type jobWaitingTasks struct {
jobState *jobState
waitingTasks []*taskState
}
// jobClass the structure used by the algorithm when computing the list of tasks to start/stop for each class.
// It tracks a class's load percent, and the current state of jobs/tasks assigned to the class. It also holds
// the intermediate fields used to calculate the tasks to start/stop.
type jobClass struct {
className string
// jobsByNumRunningTasks is a map that bins jobs by their number of running tasks. Given that the algorithm has
// determined it will start n tasks from class A, the tasks selected for starting from class A will give prefence
// to jobs with the least number of running tasks.
jobsByNumRunningTasks map[int][]jobWaitingTasks
// the largest key value in the jobsByNumRunningTasks map
maxTaskRunningMapIndex int
// the total number of tasks waiting to start for all jobs in this class
origNumWaitingTasks int
// the total number of tasks running for all jobs in this class
origNumRunningTasks int
// the target % of workers for this class
origTargetLoadPct int
// the original number of workers that should be running this class's tasks (total workers * origTargetLoadPct)
origNumTargetedWorkers int
// number of tasks that can be started (when negative -> number of tasks to stop)
numTasksToStart int
// number of tasks still waiting to be started
numWaitingTasks int
// temporary field to hold intermediate entitled values
tempEntitlement int
// temporary field to hold the normalized load %
tempNormalizedPct int
}
func (jc *jobClass) String() string {
return fmt.Sprintf("%s:TargetLoadPct:%d, origTasksWaiting:%d, origTasksRunning:%d, origTargetWorkers:%d, TasksToStart:%d, remainingWaitingTasks:%d, tempEntitlement:%d, tempNormalizedPct:%d",
jc.className,
jc.origTargetLoadPct,
jc.origNumWaitingTasks,
jc.origNumRunningTasks,
jc.origNumTargetedWorkers,
jc.numTasksToStart,
jc.numWaitingTasks,
jc.tempEntitlement,
jc.tempNormalizedPct)
}
// GetTasksToBeAssigned - the entry point to the load based scheduling algorithm
// The algorithm uses the classLoadPercents, number of tasks currently running for each class
// and number of available workers when computing the tasks to start/stop.
// The algorithm has 3 main phases: rebalancing, entitlement allocation, loaning allocation.
func (lbs *LoadBasedAlg) GetTasksToBeAssigned(jobsNotUsed []*jobState, stat stats.StatsReceiver, cs *clusterState,
jobsByRequestor map[string][]*jobState) ([]*taskState, []*taskState) {
// make local copies of the load pct structures to isolate the algorithm from user updates that may happen
// as the algorithm is running
lbs.classLoadPercents = lbs.LocalCopyClassLoadPercents()
lbs.requestorReToClassMap = lbs.getRequestorToClassMap()
lbs.classByDescLoadPct = lbs.getClassByDescLoadPct()
numWorkers := len(cs.nodes)
lbs.initOrigNumTargetedWorkers(numWorkers)
lbs.initJobClassesMap(jobsByRequestor)
rebalanced := false
var tasksToStop []*taskState
if lbs.getRebalanceMinimumDuration() > 0 && lbs.getRebalanceThreshold() > 0 {
// currentPctSpread is the delta between the highest and lowest delta from target percent loads
currentPctSpread := lbs.getCurrentPercentsSpread(numWorkers)
if currentPctSpread > lbs.getRebalanceThreshold() {
nilTime := time.Time{}
if lbs.exceededRebalanceThresholdStart == nilTime {
lbs.exceededRebalanceThresholdStart = time.Now()
} else if time.Now().Sub(lbs.exceededRebalanceThresholdStart) > lbs.config.rebalanceMinDuration {
tasksToStop = lbs.rebalanceClassTasks(jobsByRequestor, numWorkers)
lbs.exceededRebalanceThresholdStart = time.Time{}
rebalanced = true
}
}
}
if !rebalanced {
// compute the number of tasks to be started for each class
lbs.computeNumTasksToStart(cs.numFree())
}
// add the tasks to be started to the return list
tasksToStart := lbs.buildTaskStartList()
// record the assignment stats
for _, jc := range lbs.jobClasses {
stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedJobClassTasksStarting, jc.className)).Update(int64(jc.numTasksToStart))
stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedJobClassTasksWaiting, jc.className)).Update(int64(jc.origNumWaitingTasks - jc.numTasksToStart))
stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedJobClassTasksRunning, jc.className)).Update(int64(jc.origNumRunningTasks))
stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedJobClassDefinedPct, jc.className)).Update(int64(jc.origTargetLoadPct))
finalPct := int(math.Round(float64(jc.origNumRunningTasks+jc.numTasksToStart) / float64(int64(numWorkers)*100.0)))
stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedJobClassActualPct, jc.className)).Update(int64(finalPct))
}
// log the number of tasks to start/stop (when > 0)
// (also track number of times we came into this code, but didn't return any tasks to start/stop)
if len(tasksToStart) > 0 || len(tasksToStop) > 0 {
log.Debugf("Returning %d start tasks, %d stop tasks, for %d free nodes out of %d total nodes.\n"+
"(%d calls to GetTasksToBeAssigned() with no tasks assigned)\n",
len(tasksToStart), len(tasksToStop), cs.numFree(), len(cs.nodes), lbs.nopAllocationsCnt)
lbs.nopAllocationsCnt = 0
} else {
lbs.nopAllocationsCnt++
}
return tasksToStart, tasksToStop
}
// initOrigNumTargetedWorkers computes the number of workers targeted for each class as per the class's
// original target load pct
func (lbs *LoadBasedAlg) initOrigNumTargetedWorkers(numWorkers int) {
lbs.jobClasses = map[string]*jobClass{}
totalWorkers := 0
firstClass := true
for _, className := range lbs.classByDescLoadPct {
jc := &jobClass{className: className, origTargetLoadPct: lbs.classLoadPercents[className], jobsByNumRunningTasks: map[int][]jobWaitingTasks{}}
lbs.jobClasses[className] = jc
if firstClass {
firstClass = false
continue
}
targetNumWorkers := int(math.Floor(float64(numWorkers) * float64(jc.origTargetLoadPct) / 100.0))
jc.origNumTargetedWorkers = targetNumWorkers
totalWorkers += targetNumWorkers
}
lbs.jobClasses[lbs.classByDescLoadPct[0]].origNumTargetedWorkers = numWorkers - totalWorkers
}
// initJobClassesMap builds the map of waiting jobs' requestors to jobClass objects
// if we see a job whose class % not defined, assign the job to the class with the
// smallest class load %
func (lbs *LoadBasedAlg) initJobClassesMap(jobsByRequestor map[string][]*jobState) {
classNameWithLeastWorkers := lbs.classByDescLoadPct[len(lbs.classByDescLoadPct)-1]
// fill the jobClasses map with the state of the running jobs
for requestor, jobs := range jobsByRequestor {
var jc *jobClass
var ok bool
className := GetRequestorClass(requestor, lbs.requestorReToClassMap)
jc, ok = lbs.jobClasses[className]
if !ok {
// the class name was not recognized, use the class with the smallest class load %
lbs.config.stat.Counter(stats.SchedLBSUnknownJobCounter).Inc(1)
jc = lbs.jobClasses[classNameWithLeastWorkers]
log.Errorf("%s is not a recognized job class assigning to class (%s)", className, classNameWithLeastWorkers)
}
if jc.origTargetLoadPct == 0 {
log.Errorf("%s worker allocation (load %% is 0), ignoring %d jobs", requestor, len(jobs))
lbs.config.stat.Counter(stats.SchedLBSIgnoredJobCounter).Inc(1)
continue
}
// organize the class's jobs by the number of tasks currently running (map of jobs' waiting tasks indexed by the number of
// tasks currently running for the job). This will be used in the round robin task selection to start a
// class's task allocation from jobs with least number of running tasks.
// The waiting task array for each job preserves the task order from the job defintion, to ensure that jobs' tasks are started
// in the same order as they are defined in the job definition.
// This loop also computes the class's running tasks and waiting task totals
for _, job := range jobs {
_, ok := jc.jobsByNumRunningTasks[job.TasksRunning]
if !ok {
jc.jobsByNumRunningTasks[job.TasksRunning] = []jobWaitingTasks{}
}
waitingTasks := []*taskState{}
for _, taskState := range job.Tasks {
if taskState.Status == domain.NotStarted {
waitingTasks = append(waitingTasks, taskState)
jc.origNumWaitingTasks++
}
}
jc.jobsByNumRunningTasks[job.TasksRunning] = append(jc.jobsByNumRunningTasks[job.TasksRunning], jobWaitingTasks{jobState: job, waitingTasks: waitingTasks})
if job.TasksRunning > jc.maxTaskRunningMapIndex {
jc.maxTaskRunningMapIndex = job.TasksRunning
}
jc.origNumRunningTasks += job.TasksRunning
}
jc.numWaitingTasks = jc.origNumWaitingTasks
}
}
// GetRequestorClass find the requestorToClass entry for requestor
// keys in requestorToClassEntry are regular expressions
// if no match is found, return "" for the class name
func GetRequestorClass(requestor string, requestorToClassMap map[string]string) string {
for reqRe, className := range requestorToClassMap {
if m, _ := regexp.Match(reqRe, []byte(requestor)); m {
return className
}
}
return ""
}
// computeNumTasksToStart - computes the the number of tasks to start for each class.
// Perform the entitlement calculation first and if there are still unallocated wokers
// and tasks waiting to start, compute the loan calculation.
func (lbs *LoadBasedAlg) computeNumTasksToStart(numIdleWorkers int) {
var haveUnallocatedTasks bool
numIdleWorkers, haveUnallocatedTasks = lbs.entitlementTasksToStart(numIdleWorkers)
if numIdleWorkers > 0 && haveUnallocatedTasks {
lbs.workerLoanAllocation(numIdleWorkers, false)
}
}
// entitlementTasksToStart compute the number of tasks we can start for each class based on each classes original targeted
// number of workers (origNumTargetedWorkers)
// Note: this is an iterative computation that converges on the number of tasks to start within number of class's iterations.
//
// 1. compute the outstanding entitlement of a class as the class's orig target load minus (number of tasks running + number of tasks
// to start from the prior iteration) (exception: if a class does not have waiting tasks, its entitlement is 0)
// 2. compute outstanding entitlement % as outstanding entitlement/total of all classes outstanding entitlements
// 3. compute num tasks to start for each class as min(outstanding entitlement % * idle(unallocated) workers, number of the class's waiting tasks)
//
// After completing the 3 steps above, the sum of the number tasks to start may still be < number of idle workers. This will happen
// when a class's waiting task count < than its outstanding entitlement (the class cannot use all if its entitlement). When this happens,
// the un-allocated idle workers can be distributed across the other classes that have waiting tasks and have not met their full
// entitlement. We compute this by repeating steps 1-3 till all idle workers have been allocated, all waiting tasks have been
// allocated or all classes entitlements have been met. Each iteration either uses up all idle workers, all of a class's waiting tasks
// or fully allocates at least one class's task entitlement. This means that the we will not iterate more than the number of classes.
func (lbs *LoadBasedAlg) entitlementTasksToStart(numIdleWorkers int) (int, bool) {
i := 0
haveWaitingTasks := true
for ; i < len(lbs.jobClasses); i++ {
// compute the class's current entitlement: number of tasks we would like to start for each class as per the class's
// target load % and number of waiting tasks. We'll use this to compute normalized entitlement %s below.
totalEntitlements := 0
// get the current entitlements
for _, jc := range lbs.jobClasses {
if (jc.origNumRunningTasks+jc.numTasksToStart) <= jc.origNumTargetedWorkers && jc.numWaitingTasks > 0 {
jc.tempEntitlement = jc.origNumTargetedWorkers - (jc.origNumRunningTasks + jc.numTasksToStart)
} else {
jc.tempEntitlement = 0
}
totalEntitlements += jc.tempEntitlement
}
if totalEntitlements == 0 {
// the class's task allocations have used up each class's entitlement, break
// so we can move on to calculating loaned workers
break
}
// compute normalized entitlement pcts for classes with entitlement > 0
lbs.computeEntitlementPercents()
// compute number of tasks to start (workersToAllocate) as per the normalized entitlement %s
numTasksAllocated := 0
tasksToStart := min(numIdleWorkers, totalEntitlements)
numTasksAllocated, haveWaitingTasks = lbs.getTaskAllocations(tasksToStart)
numIdleWorkers -= numTasksAllocated
if !haveWaitingTasks {
break
}
if numIdleWorkers <= 0 {
break
}
}
return numIdleWorkers, haveWaitingTasks
}
// loanWorkers: We have workers that can be 'loaned' to classes that still have waiting tasks. (That is, a class
// may have more running tasks than their entitlement % because other classes are not using their full entitlement.)
// Note: this is an iterative computation that will converge on the number of workers to loan to classes
// For each iteration
// 1. normalize the original target load % to those classes with waiting tasks
// 2. compute each class's allowed loan amount as the number of unallocated workers * the normalized % but not to
// exceed the class's number of waiting tasks
//
// When a class's allowed loan amount is larger than the class's waiting tasks, there will be unallocated workers
// after all the class 'loan' amounts have been calculated. When this happens we repeat the loan calculation till
// there are no unallocated workers left. Each iteration either uses up all idle workers, or all of a class's waiting
// tasks. This means that the we will not iterate more than the number of classes.
func (lbs *LoadBasedAlg) workerLoanAllocation(numIdleWorkers int, haveRebalanced bool) {
i := 0
for ; i < len(lbs.jobClasses); i++ {
lbs.computeLoanPercents(numIdleWorkers, haveRebalanced)
// compute loan %'s and allocate idle workers
numTasksToStart, haveWaitingTasks := lbs.getTaskAllocations(numIdleWorkers)
numIdleWorkers -= numTasksToStart
if !haveWaitingTasks {
break
}
if numIdleWorkers <= 0 {
break
}
}
}
// getTaskAllocations - compute the number of tasks to start for each class.
// The class's pct at this point are normalized to factor out class's with no waiting tasks.
// The computed number of tasks will not exceed the class's number of waiting tasks.
//
// This function updates each class' numTasksToStart and returns the total number of tasks to
// start and a boolean indicating if any of the classes still have unallocated tasks
func (lbs *LoadBasedAlg) getTaskAllocations(numIdleWorkers int) (int, bool) {
totalTasksToStart := 0
haveWaitingTasks := false
for _, className := range lbs.classByDescLoadPct {
jc := lbs.jobClasses[className]
numTasksToStart := min(jc.numWaitingTasks, ceil(float32(numIdleWorkers)*(float32(jc.tempNormalizedPct)/100.0)))
if jc.numTasksToStart < 0 {
// we've determined we need to stop numTasksToStart for this class, but the subsequent loan calculation may
// have determined this class can also get loaners, we'll reduce the number of tasks to stop by the loaner amount.
// (below outside this if), but we also want to set the normalization pct to 0 to prevent redoing this reduction
// if we repeat the loan calculation
jc.tempNormalizedPct = 0.0
}
if (totalTasksToStart + numTasksToStart) > numIdleWorkers {
numTasksToStart = numIdleWorkers - totalTasksToStart
}
jc.numTasksToStart += numTasksToStart
jc.numWaitingTasks -= numTasksToStart
if jc.numWaitingTasks > 0 {
haveWaitingTasks = true
}
totalTasksToStart += numTasksToStart
}
return totalTasksToStart, haveWaitingTasks
}
// computeEntitlementPercents computes each class's current entitled % of total entitlements (from the current)
// entitlement values. The entitlement percents are written to the corresponding jobClass.tempNormalizedPCt field.
func (lbs *LoadBasedAlg) computeEntitlementPercents() {
// get the entitlements total
entitlementTotal := 0
for _, jc := range lbs.jobClasses {
entitlementTotal += jc.tempEntitlement
}
// compute the % for all but the class with the largest %. Add up all computed %s and assign
// 100 - sum of % to the class with largest % (this eliminates rounding errors, forcing the
// % to add up to 100%)
totalPercents := 0
firstClass := true
for _, className := range lbs.classByDescLoadPct {
if firstClass {
firstClass = false
continue
}
jc := lbs.jobClasses[className]
jc.tempNormalizedPct = int(math.Floor(float64(jc.tempEntitlement) * 100.0 / float64(entitlementTotal)))
totalPercents += jc.tempNormalizedPct
}
lbs.jobClasses[lbs.classByDescLoadPct[0]].tempNormalizedPct = 100 - totalPercents
}
// computeLoanPercents as orig load %'s normalized to exclude classes that don't have waiting tasks and
// to adjust loan targets based on number of workers already loaned to that class.
// The loan percents percents are written to the corresponding jobClass.tempNormalizedPCt field.
func (lbs *LoadBasedAlg) computeLoanPercents(numWorkersAvailableForLoan int, haveRebalanced bool) {
// get the sum of all the original load pcts for classes that have waiting tasks
pctsTotal := 0
for _, jc := range lbs.jobClasses {
if jc.numWaitingTasks > 0 {
pctsTotal += jc.origTargetLoadPct
}
}
if pctsTotal == 0 {
// there are no workers who can use a loan
return
}
// normalize the original class % excluding classes that don't have waiting tasks.
// (These pcts will be further adjusted to account for workers already loaned to each class.)
// Also compute the total number of loaned workers (totalLoaners).
normalizedLoanPcts := map[string]float64{}
totalLoaners := 0
for _, className := range lbs.classByDescLoadPct {
jc := lbs.jobClasses[className]
if jc.numWaitingTasks > 0 {
// normalize the class's loan %
normalizedLoanPcts[className] = float64(jc.origTargetLoadPct) / float64(pctsTotal)
} else {
// exclude the class
normalizedLoanPcts[className] = 0.0
}
if !haveRebalanced {
// only add up loaners if we are not rebalancing tasks - if we are rebalancing, then the
// number of loaners are theoretically 0
totalLoaners += int(math.Max(0.0, float64(jc.origNumRunningTasks-jc.origNumTargetedWorkers)))
}
}
// at this point origTargetLoanPcts is the orig load % normalized to exclude workers who don't have waiting tasks
// (workers who can't use a loan)
// update totalLoaners to include the idle workers that are about to be loaned
totalLoaners += numWorkersAvailableForLoan
loanEntitlements := map[string]int{}
totalEntitlements := 0
// compute the loan entitlement for each class: the class's loan % (origTargetLoanPcts) * totalLoaned workers
// then subtract the current loaned from the loan entitlement to get the remaining entitlement for each class
for className, jc := range lbs.jobClasses {
// compute the number of workers targeted to loan to this class (out of all currently loaned + workers available for loaning)
entitlement := int(math.Floor(normalizedLoanPcts[className] * float64(totalLoaners)))
currentLoaned := int(math.Max(0, float64(jc.origNumRunningTasks-jc.origNumTargetedWorkers)))
if haveRebalanced && jc.numTasksToStart < 0 {
// if we're going to be stopping tasks, decrease the number currently loaned by the number of tasks we're stopping
currentLoaned += jc.numTasksToStart
}
// reduce that entitlement by the number of workers already loaned
loanEntitlements[className] = int(math.Max(0, float64(entitlement-currentLoaned)))
totalEntitlements += loanEntitlements[className]
}
// re-normalize the loan entitlement %'s for just those workers whose loan entitlement is still > 0
// the available workers will be loaned as per these final %'s
for className, jc := range lbs.jobClasses {
jc.tempNormalizedPct = int(float64(loanEntitlements[className]) / float64(totalEntitlements) * 100.0)
}
}
// buildTaskStartList builds the list of tasks to be started for each jobClass.
func (lbs *LoadBasedAlg) buildTaskStartList() []*taskState {
tasks := []*taskState{}
for _, jc := range lbs.jobClasses {
if jc.numTasksToStart <= 0 {
continue
}
classTasks := lbs.getTasksToStartForJobClass(jc)
tasks = append(tasks, classTasks...)
}
return tasks
}
// getTasksToStartForJobClass get the tasks to start list for a given jobClass. The jobClass's numTasksToStart
// field will contain the number of tasks to start for this job class. The jobClass's jobsByNumRunningTasks is
// a map from an integer value (number of tasks running) to the list of jobs with that number of tasks running
// For a given jobClass, we start adding tasks from the jobs with the least number of tasks running.
// (Note: when a task is started for a job, the job is moved to the ‘next’ bin and placed at the end of that bin’s job list.)
func (lbs *LoadBasedAlg) getTasksToStartForJobClass(jc *jobClass) []*taskState {
tasks := []*taskState{}
startingTaskCnt := 0
// work our way through the class's jobs, starting with jobs with the least number of running tasks,
// till we've added the class's numTasksToStart number of tasks to the task list
for numRunningTasks := 0; numRunningTasks <= jc.maxTaskRunningMapIndex; numRunningTasks++ {
var jobs []jobWaitingTasks
var ok bool
if jobs, ok = jc.jobsByNumRunningTasks[numRunningTasks]; !ok {
// there are no jobs with numRunningTasks running tasks, move on to jobs with more running tasks
continue
}
// jobs contains list of jobs and their waiting taskIds. (Each job in this list has the same number of running tasks.)
// Allocate one task from each job till we've allocated numTasksToStart for the jobClass, or have allocated 1 task from
// each job in this list. As we allocate a task for a job, move the job to the end of jc.jobsByNumRunningTasks[numRunningTasks+1].
for _, job := range jobs {
if job.waitingTasks != nil && len(job.waitingTasks) > 0 {
// get the next task to start from the job
tasks = append(tasks, job.waitingTasks[0])
// move the job to jobsByRunningTasks with numRunningTasks + 1 entry. Note: we don't have to pull it from
// its current numRunningTasks bucket since this is a 1 time pass through the jobsByNumRunningTasks map. The map
// will be rebuilt with the next scheduling iteration
if len(job.waitingTasks) > 1 {
job.waitingTasks = job.waitingTasks[1:]
jc.jobsByNumRunningTasks[numRunningTasks+1] = append(jc.jobsByNumRunningTasks[numRunningTasks+1], job)
if numRunningTasks == jc.maxTaskRunningMapIndex {
jc.maxTaskRunningMapIndex++
}
} else {
job.waitingTasks = []*taskState{}
}
startingTaskCnt++
if startingTaskCnt == jc.numTasksToStart {
return tasks
}
}
}
}
msg := "getTasksToStartForJobClass() fell out of for range jobs loop. We expected it to return because startingTaskCnt == jc.numTasksToStart above " +
"before falling out of the loop. Workers may be left idle till the next scheduling iteration."
log.Warn(msg)
return tasks
}
// buildTaskStopList builds the list of tasks to be stopped.
func (lbs *LoadBasedAlg) buildTaskStopList() []*taskState {
tasks := []*taskState{}
for _, jc := range lbs.jobClasses {
if jc.numTasksToStart >= 0 {
continue
}
classTasks := lbs.getTasksToStopForJobClass(jc)
tasks = append(tasks, classTasks...)
}
return tasks
}
// getTasksToStopForJobClass for each job class return the abs(numTasksToStart) most recently started
// tasks. (numTasksToStart will be a negative number)
func (lbs *LoadBasedAlg) getTasksToStopForJobClass(jobClass *jobClass) []*taskState {
earliest := time.Now().Add(-1 * MaxTaskDuration)
startTimeSec := time.Now().Truncate(time.Second)
numTasksToStop := jobClass.numTasksToStart * -1
tasksToStop := []*taskState{}
for len(tasksToStop) < numTasksToStop {
key := taskClassAndStartKey{class: jobClass.className, start: startTimeSec}
tasks := lbs.tasksByJobClassAndStartTimeSec[key]
for _, task := range tasks {
tasksToStop = append(tasksToStop, task)
if len(tasksToStop) == numTasksToStop {
break
}
}
startTimeSec = startTimeSec.Add(-1 * time.Second).Truncate(time.Second)
if startTimeSec.Before(earliest) {
break
}
}
lbs.config.stat.Gauge(fmt.Sprintf("%s%s", stats.SchedStoppingTasks, jobClass.className)).Update(int64(numTasksToStop))
return tasksToStop
}
func (lbs *LoadBasedAlg) getNumTasksToStart(requestor string) int {
return lbs.jobClasses[requestor].numTasksToStart
}
// rebalanceClassTasks compute the tasks that should be deleted to allow the scheduling algorithm
// to start tasks in better alignment with the original targeted task load percents.
// The function returns the list of tasks to stop and updates the jobClass objects with the number
// of tasks to start
func (lbs *LoadBasedAlg) rebalanceClassTasks(jobsByRequestor map[string][]*jobState, totalWorkers int) []*taskState {
log.Info("Rebalancing")
totalTasks := 0
// compute number tasks as per the each class's entitlement and waiting tasks
// will be negative when a class is over its entitlement
for _, jc := range lbs.jobClasses {
if jc.origNumRunningTasks > jc.origNumTargetedWorkers {
// the class is running more than its entitled number of tasks, numTasksToStart is number of tasks
// to stop to bring back to its entitlement (it will be a negative number)
jc.numTasksToStart = jc.origNumTargetedWorkers - jc.origNumRunningTasks
} else if jc.origNumRunningTasks+jc.origNumWaitingTasks < jc.origNumTargetedWorkers {
// the waiting tasks won't put the class over its entitlement
jc.numTasksToStart = jc.origNumWaitingTasks
} else {
// the number of tasks that could be started to bring the class up to its entitlement
jc.numTasksToStart = jc.origNumTargetedWorkers - jc.origNumRunningTasks
}
totalTasks += jc.origNumRunningTasks + jc.numTasksToStart
}
if totalTasks < totalWorkers {
// some classes are not using their full allocation, we can loan workers
lbs.computeLoanPercents(totalWorkers-totalTasks, true)
lbs.getTaskAllocations(totalWorkers - totalTasks)
}
tasksToStop := lbs.buildTaskStopList()
return tasksToStop
}
// getCurrentPercentsSpread is used to measure how well the current running tasks counts match the target loads.
// It computes each class's difference between the target load pct and actual load pct. The 'spread' value
// is the difference between the min and max differences across the classes. Eg: if 30% was the load target
// for class A and class A is using 50% of the workers, A's pct difference is -20%. If class B has a target
// of 15% and is only using 5% of workers, B's pct difference is 10%. If A and B are the only classes, the
// pctSpread is 25% (5% - -20%).
func (lbs *LoadBasedAlg) getCurrentPercentsSpread(totalWorkers int) int {
if len(lbs.jobClasses) < 2 {
return 0
}
minPct := 0
maxPct := 0
for _, className := range lbs.classByDescLoadPct {
jc := lbs.jobClasses[className]
currPct := int(math.Floor(float64(jc.origNumRunningTasks) * 100.0 / float64(totalWorkers)))
pctDiff := jc.origTargetLoadPct - currPct
if pctDiff < 0 || jc.numWaitingTasks > 0 { // only consider classes using loaned workers, or with waiting tasks
minPct = min(minPct, pctDiff)
maxPct = max(maxPct, pctDiff)
}
}
return maxPct - minPct
}
// getClassByDescLoadPct get a copy of the config's class by descending load pcts
func (lbs *LoadBasedAlg) getClassByDescLoadPct() []string {
lbs.config.classLoadPercentsMu.RLock()
defer lbs.config.classLoadPercentsMu.RUnlock()
copy := []string{}
for _, v := range lbs.config.classByDescLoadPct {
copy = append(copy, v)
}
return copy
}
// getClassLoadPercents return a copy of the ClassLoadPercents converting to int32
func (lbs *LoadBasedAlg) getClassLoadPercents() map[string]int32 {
lbs.config.classLoadPercentsMu.RLock()
defer lbs.config.classLoadPercentsMu.RUnlock()
copy := map[string]int32{}
for k, v := range lbs.config.classLoadPercents {
copy[k] = int32(v)
}
return copy
}
// LocalCopyClassLoadPercents return a copy of the ClassLoadPercents leaving as int
func (lbs *LoadBasedAlg) LocalCopyClassLoadPercents() map[string]int {
lbs.config.classLoadPercentsMu.RLock()
defer lbs.config.classLoadPercentsMu.RUnlock()
copy := map[string]int{}
for k, v := range lbs.config.classLoadPercents {
copy[k] = v
}
return copy
}
// setClassLoadPercents set the scheduler's class load pcts with a copy of the input class load pcts
func (lbs *LoadBasedAlg) setClassLoadPercents(classLoadPercents map[string]int32) {
lbs.config.classLoadPercentsMu.Lock()
defer lbs.config.classLoadPercentsMu.Unlock()
// build a list that orders the classes by descending pct.
keys := []string{}
for key := range classLoadPercents {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool {
return classLoadPercents[keys[i]] > classLoadPercents[keys[j]]
})
lbs.config.classByDescLoadPct = keys
// set the load pcts - normalizing them if the don't add up to 100
lbs.config.classLoadPercents = map[string]int{}
pctTotal := 0
for k, val := range classLoadPercents {
lbs.config.classLoadPercents[k] = int(val)
pctTotal += int(val)
}
if pctTotal != 100 {
log.Errorf("LoadBalanced scheduling %%'s don't add up to 100, normalizing them")
lbs.normalizePercents(pctTotal)
}
log.Infof("classLoadPercents are %v", lbs.config.classLoadPercents)
}
// normalizePercents normalizes the class percents to the sum of their values. Only uses when the configured
// percents don't add up to 100
func (lbs *LoadBasedAlg) normalizePercents(percentsSum int) {
totalNormalizedPercents := 0
firstClass := true
normalizedPercents := map[string]int{}
for _, className := range lbs.config.classByDescLoadPct {
if firstClass {
firstClass = false
continue // skip the first class (highest %), it will be given the difference between 100 and the sum of the other %'s
}
classPercent := lbs.config.classLoadPercents[className]
normalizedPercents[className] = int(math.Floor(float64(classPercent) / float64(percentsSum) * 100))
totalNormalizedPercents += normalizedPercents[className]
}
normalizedPercents[lbs.config.classByDescLoadPct[0]] = 100 - totalNormalizedPercents
lbs.config.classLoadPercents = normalizedPercents
}
// getRequestorToClassMap return a copy of the RequestorToClassMap
func (lbs *LoadBasedAlg) getRequestorToClassMap() map[string]string {
lbs.config.requestorReToClassMapMU.RLock()
defer lbs.config.requestorReToClassMapMU.RUnlock()
copy := map[string]string{}
for k, v := range lbs.config.requestorReToClassMap {
copy[k] = v
}
return copy
}
// setRequestorToClassMap set the scheduler's requestor to class map with a copy of the input map
func (lbs *LoadBasedAlg) setRequestorToClassMap(requestorToClassMap map[string]string) {
lbs.config.requestorReToClassMapMU.Lock()
defer lbs.config.requestorReToClassMapMU.Unlock()
lbs.config.requestorReToClassMap = map[string]string{}
for k, v := range requestorToClassMap {
lbs.config.requestorReToClassMap[k] = v
}
log.Infof("set requestorToClassMap to %v", requestorToClassMap)
}
// getRebalanceMinimumDuration get the rebalance duration
func (lbs *LoadBasedAlg) getRebalanceMinimumDuration() time.Duration {
lbs.config.rebalanceMinDurationMu.RLock()
defer lbs.config.rebalanceMinDurationMu.RUnlock()
return lbs.config.rebalanceMinDuration
}
// setRebalanceMinimumDuration set the rebalance duration
func (lbs *LoadBasedAlg) setRebalanceMinimumDuration(rebalanceMinDuration time.Duration) {
lbs.config.rebalanceMinDurationMu.Lock()
defer lbs.config.rebalanceMinDurationMu.Unlock()
lbs.config.rebalanceMinDuration = rebalanceMinDuration
log.Infof("set rebalanceMinDuration to %s", rebalanceMinDuration)
}
// getRebalanceThreshold get the rebalance threshold
func (lbs *LoadBasedAlg) getRebalanceThreshold() int {
lbs.config.rebalanceThresholdMu.RLock()
defer lbs.config.rebalanceThresholdMu.RUnlock()
return lbs.config.rebalanceThreshold
}
// setRebalanceThreshold set the rebalance thresold
func (lbs *LoadBasedAlg) setRebalanceThreshold(rebalanceThreshold int) {
lbs.config.rebalanceThresholdMu.Lock()
defer lbs.config.rebalanceThresholdMu.Unlock()
lbs.config.rebalanceThreshold = rebalanceThreshold
log.Infof("set rebalanceThreshold to %d", rebalanceThreshold)
}
func (lbs *LoadBasedAlg) GetDataStructureSizeStats() map[string]int {
return map[string]int{
stats.SchedLBSConfigDescLoadPctSize: len(lbs.config.classByDescLoadPct),
stats.SchedLBSConfigLoadPercentsSize: len(lbs.config.classLoadPercents),
stats.SchedLBSConfigRequestorToPctsSize: len(lbs.config.requestorReToClassMap),
stats.SchedLBSWorkingDescLoadPctSize: len(lbs.classByDescLoadPct),
stats.SchedLBSWorkingJobClassesSize: lbs.getJobClassesSize(),
stats.SchedLBSWorkingLoadPercentsSize: len(lbs.classByDescLoadPct),
stats.SchedLBSWorkingRequestorToPctsSize: len(lbs.requestorReToClassMap),
}
}
func (lbs *LoadBasedAlg) getJobClassesSize() int {
// get sum of number of waiting tasks across all the jobClasses's job entries
s := 0
for _, jc := range lbs.jobClasses {
for _, wt := range jc.jobsByNumRunningTasks {
s += len(wt)
}
}
return s
}