scheduler/server/stateful_scheduler.go (1,104 lines of code) (raw):
package server
import (
"errors"
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
uuid "github.com/nu7hatch/gouuid"
log "github.com/sirupsen/logrus"
"github.com/twitter/scoot/async"
cc "github.com/twitter/scoot/cloud/cluster"
"github.com/twitter/scoot/common/log/hooks"
"github.com/twitter/scoot/common/log/tags"
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/runner"
"github.com/twitter/scoot/saga"
"github.com/twitter/scoot/scheduler/domain"
worker "github.com/twitter/scoot/worker/domain"
)
const (
// Clients will check for this string to differentiate between scoot and user initiated actions.
UserRequestedErrStr = "UserRequested"
// Provide defaults for config settings that should never be uninitialized/zero.
// These are reasonable defaults for a small cluster of around a couple dozen nodes.
// Nothing should run forever by default, use this timeout as a fallback.
DefaultDefaultTaskTimeout = 30 * time.Minute
// Allow extra time when waiting for a task response.
// This includes network time and the time to upload logs to bundlestore.
DefaultTaskTimeoutOverhead = 15 * time.Second
// Number of different requestors that can run jobs at any given time.
DefaultMaxRequestors = 10
// Number of jobs any single requestor can have (to prevent spamming, not for scheduler fairness).
DefaultMaxJobsPerRequestor = 100
// Set the maximum number of tasks we'd expect to queue to a nonzero value (it'll be overridden later).
DefaultSoftMaxSchedulableTasks = 1
// Threshold for jobs considered long running
LongJobDuration = 4 * time.Hour
// How often Scheduler step is called in loop
TickRate = 250 * time.Millisecond
// The max job priority we respect (higher priority is untested and disabled)
MaxPriority = domain.P2
// Max number of requestors to track tag history, and max number of tags per requestor to track
DefaultMaxRequestorHistories = 1000000
DefaultMaxHistoryTags = 100
// Max number of task IDs to track durations for
DefaultMaxTaskDurations = 1000000
)
// Used to get proper logging from tests...
func init() {
if loglevel := os.Getenv("SCOOT_LOGLEVEL"); loglevel != "" {
level, err := log.ParseLevel(loglevel)
if err != nil {
log.Error(err)
return
}
log.SetLevel(level)
log.AddHook(hooks.NewContextHook())
} else {
// setting Error level to avoid Travis test failure due to log too long
log.SetLevel(log.ErrorLevel)
}
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
// nopDurationKeyExtractor returns an unchanged key.
func nopDurationKeyExtractor(key string) string {
return key
}
// SchedulerConfiguration variables read at initialization
// MaxRetriesPerTask - the number of times to retry a failing task before
// marking it as completed.
// DebugMode - if true, starts the scheduler up but does not start
// the update loop. Instead the loop must be advanced manually
// by calling step()
// RecoverJobsOnStartup - if true, the scheduler recovers active sagas,
// from the sagalog, and restarts them.
// DefaultTaskTimeout -
// default timeout for tasks.
// TaskTimeoutOverhead
// How long to wait for a response after the task has timed out.
// RunnerRetryTimeout -
// how long to keep retrying a runner req.
// RunnerRetryInterval -
// how long to sleep between runner req retries.
// ReadyFnBackoff -
// how long to wait between runner status queries to determine [init] status.
// TaskThrottle -
// requestors will try not to schedule jobs that make the scheduler exceed
// the TaskThrottle. Note: Sickle may exceed it with retries.
type SchedulerConfiguration struct {
MaxRetriesPerTask int
DebugMode bool
RecoverJobsOnStartup bool
DefaultTaskTimeout time.Duration
TaskTimeoutOverhead time.Duration
RunnerRetryTimeout time.Duration
RunnerRetryInterval time.Duration
ReadyFnBackoff time.Duration
MaxRequestors int
MaxJobsPerRequestor int
TaskThrottle int
Admins []string
SchedAlgConfig interface{}
SchedAlg SchedulingAlgorithm
}
func (sc *SchedulerConfiguration) String() string {
return fmt.Sprintf("SchedulerConfiguration: MaxRetriesPerTask: %d, DebugMode: %t, RecoverJobsOnStartup: %t, DefaultTaskTimeout: %s, "+
"TaskTimeoutOverhead: %s, RunnerRetryTimeout: %s, RunnerRetryInterval: %s, MaxRequestors: %d, MaxJobsPerRequestor: %d, TaskThrottle: %d, "+
"Admins: %v",
sc.MaxRetriesPerTask, sc.DebugMode, sc.RecoverJobsOnStartup, sc.DefaultTaskTimeout, sc.TaskTimeoutOverhead, sc.RunnerRetryTimeout,
sc.RunnerRetryInterval, sc.MaxRequestors, sc.MaxJobsPerRequestor, sc.TaskThrottle, sc.Admins)
}
// Used to keep a running average of duration for a specific task.
type averageDuration struct {
count int64
duration time.Duration
}
func (ad *averageDuration) update(d time.Duration) {
ad.count++
ad.duration = ad.duration + time.Duration(int64(d-ad.duration)/ad.count)
}
type RunnerFactory func(node cc.Node) runner.Service
// Scheduler that keeps track of the state of running tasks & the cluster
// so that it can make smarter scheduling decisions
//
// Scheduler Concurrency: The Scheduler runs an update loop in its own go routine.
// periodically the scheduler does some async work using async.Runner. The async
// work is executed in its own Go routine, nothing in async functions should read
// or modify scheduler state directly.
//
// The callbacks are executed as part of the scheduler loop. They therefore can
// safely read & modify the scheduler state.
type statefulScheduler struct {
config *SchedulerConfiguration
sagaCoord saga.SagaCoordinator
runnerFactory RunnerFactory
asyncRunner async.Runner
checkJobCh chan jobCheckMsg
addJobCh chan jobAddedMsg
killJobCh chan jobKillRequest
stepTicker *time.Ticker
// Scheduler State
clusterState *clusterState
inProgressJobs []*jobState // ordered list (by jobId) of jobs being scheduled. Note: it might be
// no tasks have started yet.
requestorMap map[string][]*jobState // map of requestor to all its jobs. Default requestor="" is ok.
requestorHistory *lru.Cache // cache of join(requestor, basis) to new tags in the order received.
taskDurations *lru.Cache
tasksByJobClassAndStartTimeSec map[taskClassAndStartKey]taskStateByJobIDTaskID // map of tasks by their class and start time
// stats
stat stats.StatsReceiver
persistor Persistor
TaskThrottleMu sync.RWMutex // mutex must be here to avoid copying the lock when passing config to scheduler constructor
// durationKeyExtractorFn - function to extract, from taskID, the key to use for tracking task average durations
durationKeyExtractorFn func(string) string
SchedAlg SchedulingAlgorithm
}
func (s *statefulScheduler) String() string {
return fmt.Sprintf("%s, num nodes: %d, num suspended nodes: %d, maxFlakyDuration: %s, maxLostDuration: %s",
s.config, len(s.clusterState.nodes), len(s.clusterState.suspendedNodes), s.clusterState.maxFlakyDuration, s.clusterState.maxLostDuration)
}
// contains jobId to be killed and callback for the result of processing the request
type jobKillRequest struct {
jobId string
responseCh chan error
}
// Create a New StatefulScheduler that implements the Scheduler interface
// cc.Cluster - cluster of worker nodes
// saga.SagaCoordinator - the Saga Coordinator to log to and recover from
// RunnerFactory - Function which converts a node to a Runner
// SchedulerConfig - additional configuration settings for the scheduler
// StatsReceiver - stats receiver to log statistics to
// specifying debugMode true, starts the scheduler up but does not start
// the update loop. Instead the loop must be advanced manually by calling
// step(), intended for debugging and test cases
// If recoverJobsOnStartup is true Active Sagas in the saga log will be recovered
// and rescheduled, otherwise no recovery will be done on startup
func NewStatefulScheduler(
nodesUpdatesCh chan []cc.NodeUpdate,
sc saga.SagaCoordinator,
rf RunnerFactory,
config SchedulerConfiguration,
stat stats.StatsReceiver,
persistor Persistor,
durationKeyExtractorFn func(string) string) *statefulScheduler {
nodeReadyFn := func(node cc.Node) (bool, time.Duration) {
run := rf(node)
st, svc, err := run.StatusAll()
if err != nil || !svc.Initialized || !svc.IsHealthy {
if svc.Error != nil {
log.WithFields(
log.Fields{
"node": node,
"err": svc.Error,
}).Info("received service err")
return false, 0
}
return false, config.ReadyFnBackoff
}
for _, s := range st {
log.WithFields(
log.Fields{
"node": node,
"runID": s.RunID,
"state": s.State,
"stdout": s.StdoutRef,
"stderr": s.StderrRef,
"snapshotID": s.SnapshotID,
"exitCode": s.ExitCode,
"error": s.Error,
"jobID": s.JobID,
"taskID": s.TaskID,
"tag": s.Tag,
}).Info("Aborting existing run on new node")
run.Abort(s.RunID)
}
return true, 0
}
if config.ReadyFnBackoff == 0 {
nodeReadyFn = nil
}
if config.DefaultTaskTimeout == 0 {
config.DefaultTaskTimeout = DefaultDefaultTaskTimeout
}
if config.TaskTimeoutOverhead == 0 {
config.TaskTimeoutOverhead = DefaultTaskTimeoutOverhead
}
if config.MaxRequestors == 0 {
config.MaxRequestors = DefaultMaxRequestors
}
if config.MaxJobsPerRequestor == 0 {
config.MaxJobsPerRequestor = DefaultMaxJobsPerRequestor
}
config.SchedAlgConfig = &LoadBasedAlgConfig{
stat: stat,
}
// create the load base scheduling algorithm
tasksByClassAndStartMap := map[taskClassAndStartKey]taskStateByJobIDTaskID{}
sa := NewLoadBasedAlg(config.SchedAlgConfig.(*LoadBasedAlgConfig), tasksByClassAndStartMap)
sa.setClassLoadPercents(DefaultLoadBasedSchedulerClassPercents)
sa.setRequestorToClassMap(DefaultRequestorToClassMap)
sa.setRebalanceMinimumDuration(DefaultMinRebalanceTime)
config.SchedAlg = sa
config.SchedAlgConfig = sa.config
requestorHistory, err := lru.New(DefaultMaxRequestorHistories)
if err != nil {
log.Errorf("Failed to create requestorHistory cache: %v", err)
return nil
}
taskDurations, err := lru.New(DefaultMaxTaskDurations)
if err != nil {
log.Errorf("Failed to create taskDurations cache: %v", err)
return nil
}
dkef := durationKeyExtractorFn
if durationKeyExtractorFn == nil {
dkef = nopDurationKeyExtractor
}
sched := &statefulScheduler{
config: &config,
sagaCoord: sc,
runnerFactory: rf,
asyncRunner: async.NewRunner(),
checkJobCh: make(chan jobCheckMsg, 1),
addJobCh: make(chan jobAddedMsg, 1),
killJobCh: make(chan jobKillRequest, 1), // TODO - what should this value be?
stepTicker: time.NewTicker(TickRate),
clusterState: newClusterState(nodesUpdatesCh, nodeReadyFn, stat),
inProgressJobs: make([]*jobState, 0),
requestorMap: make(map[string][]*jobState),
requestorHistory: requestorHistory,
taskDurations: taskDurations,
stat: stat,
tasksByJobClassAndStartTimeSec: tasksByClassAndStartMap,
persistor: persistor,
durationKeyExtractorFn: dkef,
}
sched.setThrottle(-1)
if sched.persistor == nil {
log.Info("setting persistor is nil, settings will reset to default values on scheduler reset")
}
sched.loadSettings()
log.Info(sched)
if !config.DebugMode {
// start the scheduler loop
log.Info("Starting scheduler loop")
go func() {
sched.loop()
}()
}
// Recover Jobs in a separate go routine to allow the scheduler
// to accept new jobs while recovering old ones.
if config.RecoverJobsOnStartup {
go func() {
recoverJobs(sched.sagaCoord, sched.addJobCh)
}()
}
return sched
}
type jobCheckMsg struct {
jobDef *domain.JobDefinition
resultCh chan error
}
type jobAddedMsg struct {
job *domain.Job
saga *saga.Saga
}
type requestorCounts struct {
numJobsRunning int
numJobsWaitingToStart int
numTasksRunning int
numTasksWaitingToStart int
numRemainingTasks int // not completed tasks
}
/*
validate the job request. If the job passes validation, the job's tasks are queued for processing as
per the task scheduling algorithm and an id for the job is returned, otherwise the error message is returned.
*/
func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, error) {
/*
Put the job request and a callback channel on the check job channel. Wait for the
scheduling thread to pick up the request from the check job channel, verify that it
can handle the request and return either nil or an error on the callback channel.
If no error is found, generate an id for the job, start a saga for the job and add the
job to the add job channel.
Return either the error message or job id to the caller.
*/
defer s.stat.Latency(stats.SchedJobLatency_ms).Time().Stop()
s.stat.Counter(stats.SchedJobRequestsCounter).Inc(1)
log.WithFields(
log.Fields{
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"numTasks": len(jobDef.Tasks),
}).Info("New job request")
checkResultCh := make(chan error, 1)
s.checkJobCh <- jobCheckMsg{
jobDef: &jobDef,
resultCh: checkResultCh,
}
err := <-checkResultCh
if err != nil {
log.WithFields(
log.Fields{
"jobDef": jobDef,
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"err": err,
}).Error("Rejected job request")
return "", err
}
job := &domain.Job{
Id: generateJobId(),
Def: jobDef,
}
if job.Def.Tag == "" {
job.Def.Tag = job.Id
}
asBytes, err := job.Serialize()
if err != nil {
log.WithFields(
log.Fields{
"jobDef": jobDef,
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"err": err,
}).Error("Failed to serialize job request")
return "", err
}
// Log StartSaga Message
sagaObj, err := s.sagaCoord.MakeSaga(job.Id, asBytes)
if err != nil {
log.WithFields(
log.Fields{
"jobDef": jobDef,
"err": err,
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
}).Error("Failed to create saga for job request")
return "", err
}
log.WithFields(
log.Fields{
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"numTasks": len(jobDef.Tasks),
}).Info("Queueing job request")
s.stat.Counter(stats.SchedJobsCounter).Inc(1)
s.addJobCh <- jobAddedMsg{
job: job,
saga: sagaObj,
}
return job.Id, nil
}
// generates a jobId using a random uuid
func generateJobId() string {
// uuid.NewV4() should never actually return an error. The code uses
// rand.Read Api to generate the uuid, which according to golang docs
// "Read always returns ... a nil error" https://golang.org/pkg/math/rand/#Read
for {
if id, err := uuid.NewV4(); err == nil {
return id.String()
}
}
}
// run the scheduler loop indefinitely in its own thread.
// we are not putting any logic other than looping in this method so unit tests can verify
// behavior by controlling calls to step() below
func (s *statefulScheduler) loop() {
for {
s.step()
// Wait until our TickRate has elapsed or we have a pending action.
// Detect pending action by monitoring statefulScheduler's job channels.
// Since "detect" means we pulled off of a channel, put it back,
// asynchronously in case the channel is blocked/full (it will be drained next step())
select {
case msg := <-s.checkJobCh:
go func() {
s.checkJobCh <- msg
}()
case msg := <-s.addJobCh:
go func() {
s.addJobCh <- msg
}()
case msg := <-s.killJobCh:
go func() {
s.killJobCh <- msg
}()
case <-s.stepTicker.C:
}
}
}
// run one loop iteration
func (s *statefulScheduler) step() {
defer s.stat.Latency(stats.SchedStepLatency_ms).Time().Stop()
// update scheduler state with messages received since last loop
// nodes added or removed to cluster, new jobs scheduled,
// async functions completed & invoke callbacks
s.addJobs()
s.clusterState.updateCluster()
procMessagesLatency := s.stat.Latency(stats.SchedProcessMessagesLatency_ms).Time()
s.asyncRunner.ProcessMessages()
procMessagesLatency.Stop()
// TODO: make processUpdates on scheduler state wait until an update
// has been received
// instead of just burning CPU and constantly looping while no updates
// have occurred
s.checkForCompletedJobs()
s.killJobs()
s.scheduleTasks()
s.updateStats()
}
//update the stats monitoring values:
//number of job requests running or waiting to start
//number of jobs waiting to start
//number of tasks currently running
//total number of waiting or running tasks
//
//for each unique requestor count:
//. number of tasks running
//. number of tasks waiting
//. number of jobs running or waiting to start
func (s *statefulScheduler) updateStats() {
remainingTasks := 0
waitingTasks := 0
runningTasks := 0
jobsWaitingToStart := 0
requestorsCounts := make(map[string]*requestorCounts) // map of requestor to job and task stats counts
// get job and task counts by requestor, and overall jobs stats
for _, job := range s.inProgressJobs {
requestor := job.Job.Def.Requestor
if _, ok := requestorsCounts[requestor]; !ok {
// first time we've seen this requestor, initialize its map entry
counts := &requestorCounts{}
requestorsCounts[requestor] = counts
}
running := job.GetNumRunning()
completed := job.GetNumCompleted()
remaining := len(job.Tasks) - completed
waiting := remaining - running
status := job.getJobStatus()
// accumulate totals independent of requestors
remainingTasks += remaining
waitingTasks += waiting
runningTasks += running
// totals by requestor
if job.TasksCompleted+job.TasksRunning == 0 {
jobsWaitingToStart += 1
} else if status == domain.InProgress {
requestorsCounts[requestor].numJobsRunning++
}
requestorsCounts[requestor].numRemainingTasks += remaining
requestorsCounts[requestor].numTasksRunning += running
requestorsCounts[requestor].numTasksWaitingToStart += waiting
if time.Since(job.TimeMarker) > LongJobDuration {
job.TimeMarker = time.Now()
log.WithFields(
log.Fields{
"requestor": job.Job.Def.Requestor,
"jobType": job.Job.Def.JobType,
"jobId": job.Job.Id,
"tag": job.Job.Def.Tag,
"basis": job.Job.Def.Basis,
"priority": job.Job.Def.Priority,
"numTasks": len(job.Tasks),
"tasksRunning": job.TasksRunning,
"tasksCompleted": job.TasksCompleted,
"runTime": time.Since(job.TimeCreated),
}).Info("Long-running job")
}
}
// publish the requestor stats
tasksRunningCheckSum := 0
tasksWaitingCheckSum := 0
tasksRemainingCheckSum := 0
for requestor, counts := range requestorsCounts {
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumRunningJobsGauge, requestor)).Update(int64(
counts.numJobsRunning))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedWaitingJobsGauge, requestor)).Update(int64(
counts.numJobsWaitingToStart))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumRunningTasksGauge, requestor)).Update(int64(
counts.numTasksRunning))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumWaitingTasksGauge, requestor)).Update(int64(
counts.numTasksWaitingToStart))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedInProgressTasksGauge, requestor)).Update(int64(
counts.numRemainingTasks))
tasksRunningCheckSum += counts.numTasksRunning
tasksWaitingCheckSum += counts.numTasksWaitingToStart
tasksRemainingCheckSum += counts.numRemainingTasks
}
if tasksRunningCheckSum != runningTasks || tasksWaitingCheckSum != waitingTasks || tasksRemainingCheckSum != remainingTasks {
log.Errorf("stats checksum error\nrunning: expected: %d, got:%d\n waiting: expected: %d, got:%d\nremaining: expected:%d, got:%d",
runningTasks, tasksRunningCheckSum, waitingTasks, tasksWaitingCheckSum, remainingTasks, tasksRemainingCheckSum)
}
// publish the rest of the stats
s.stat.Gauge(stats.SchedAcceptedJobsGauge).Update(int64(len(s.inProgressJobs)))
s.stat.Gauge(stats.SchedWaitingJobsGauge).Update(int64(jobsWaitingToStart))
s.stat.Gauge(stats.SchedInProgressTasksGauge).Update(int64(remainingTasks))
s.stat.Gauge(stats.SchedNumRunningTasksGauge).Update(int64(runningTasks))
s.stat.Gauge(stats.SchedNumWaitingTasksGauge).Update(int64(waitingTasks))
s.stat.Gauge(stats.SchedNumAsyncRunnersGauge).Update(int64(s.asyncRunner.NumRunning())) //TODO remove when done debugging
// print internal data structure sizes
var lbs *LoadBasedAlg = s.config.SchedAlg.(*LoadBasedAlg)
lbsStats := lbs.GetDataStructureSizeStats()
for k, v := range lbsStats {
s.stat.Gauge(k).Update(int64(v))
}
s.stat.Gauge(stats.SchedTaskStartTimeMapSize).Update(int64(s.getSchedTaskStartTimeMapSize()))
s.stat.Gauge(stats.SchedInProgressJobsSize).Update(int64(len(s.inProgressJobs)))
s.stat.Gauge(stats.SchedRequestorMapSize).Update(int64(len(s.requestorMap)))
s.stat.Gauge(stats.SchedRequestorHistorySize).Update(int64(s.requestorHistory.Len()))
s.stat.Gauge(stats.SchedTaskDurationsSize).Update(int64(s.taskDurations.Len()))
s.stat.Gauge(stats.SchedSagasSize).Update(int64(s.sagaCoord.GetNumSagas()))
s.stat.Gauge(stats.SchedRunnersSize).Update(int64(s.asyncRunner.NumRunning()))
}
// getSchedTaskStartTimeMapSize get the number of running tasks being tracked by tasksByJobClassAndStartTimeSec map
// (should never be larger than the number of workers)
func (s *statefulScheduler) getSchedTaskStartTimeMapSize() int {
sz := 0
for _, v := range s.tasksByJobClassAndStartTimeSec {
sz += len(v)
}
return sz
}
func (s *statefulScheduler) getSchedulerTaskCounts() (int, int, int) {
// get the count of total tasks across all jobs, completed tasks and
// running tasks
var total, completed, running int
for _, job := range s.inProgressJobs {
total += len(job.Tasks)
completed += job.TasksCompleted
running += job.TasksRunning
}
return total, completed, running
}
// Checks all new job requests (on the check job channel) that have come in since the last iteration of the step() loop,
// verify the job request: it doesn't exceed the requestor's limits or number of requestors, has a valid priority and
// doesn't duplicate tasks in another new job request.
//
// If the job fails the validation, put an error on the job's callback channel, otherwise put nil on the job's callback
// channel.
func (s *statefulScheduler) checkJobsLoop() {
defer s.stat.Latency(stats.SchedCheckJobsLoopLatency_ms).Time().Stop()
for {
select {
case checkJobMsg := <-s.checkJobCh:
var err error
if jobs, ok := s.requestorMap[checkJobMsg.jobDef.Requestor]; !ok && len(s.requestorMap) >= s.config.MaxRequestors {
err = fmt.Errorf("exceeds max number of requestors: %s (%d)", checkJobMsg.jobDef.Requestor, s.config.MaxRequestors)
} else if len(jobs) >= s.config.MaxJobsPerRequestor {
err = fmt.Errorf("exceeds max jobs per requestor: %s (%d)", checkJobMsg.jobDef.Requestor, s.config.MaxJobsPerRequestor)
} else if checkJobMsg.jobDef.Priority < domain.P0 || checkJobMsg.jobDef.Priority > domain.P2 {
err = fmt.Errorf("invalid priority %d, must be between 0-2 inclusive", checkJobMsg.jobDef.Priority)
} else {
// Check for duplicate task names
seenTasks := map[string]bool{}
for _, t := range checkJobMsg.jobDef.Tasks {
if _, ok := seenTasks[t.TaskID]; ok {
err = fmt.Errorf("invalid dup taskID %s", t.TaskID)
break
}
seenTasks[t.TaskID] = true
}
}
if err == nil && checkJobMsg.jobDef.Basis != "" {
// Check if the given tag is expired for the given requestor & basis.
rb := checkJobMsg.jobDef.Requestor + checkJobMsg.jobDef.Basis
rh := getRequestorHistory(s.requestorHistory, rb)
if stringInSlice(checkJobMsg.jobDef.Tag, rh) &&
rh[len(rh)-1] != checkJobMsg.jobDef.Tag {
err = fmt.Errorf("expired tag=%s for basis=%s. Expected either tag=%s or new tag",
checkJobMsg.jobDef.Tag, checkJobMsg.jobDef.Basis, rh[len(rh)-1])
} else {
addOrUpdateRequestorHistory(s.requestorHistory, rb, checkJobMsg.jobDef.Tag)
}
}
checkJobMsg.resultCh <- err
default:
return
}
}
}
// after all new job requests have been verified, get all jobs that were put in the add job channel since the last
// pass through step() and add them to the inProgress list, order the tasks in the job by descending duration and
// add the job to the requestor map
func (s *statefulScheduler) addJobsLoop() {
defer s.stat.Latency(stats.SchedAddJobsLoopLatency_ms).Time().Stop()
var (
receivedJob bool
total int
completed int
running int
)
for {
select {
case newJobMsg := <-s.addJobCh:
receivedJob = true
lf := log.Fields{
"jobID": newJobMsg.job.Id,
"requestor": newJobMsg.job.Def.Requestor,
"jobType": newJobMsg.job.Def.JobType,
"tag": newJobMsg.job.Def.Tag,
"basis": newJobMsg.job.Def.Basis,
"priority": newJobMsg.job.Def.Priority,
"numTasks": len(newJobMsg.job.Def.Tasks),
}
if _, ok := s.requestorMap[newJobMsg.job.Def.Requestor]; ok {
for _, js := range s.requestorMap[newJobMsg.job.Def.Requestor] {
// Kill any prior jobs that share the same requestor and non-empty basis string,
// but tag should be different so we don't kill the original jobs when doing retries for example.
// (retries share the same requestor, basis, and tag values).
if newJobMsg.job.Def.Basis != "" &&
js.Job.Def.Basis == newJobMsg.job.Def.Basis &&
js.Job.Def.Tag != newJobMsg.job.Def.Tag {
log.WithFields(lf).Infof("Matching basis, killing prevJobID=%s", js.Job.Id)
go s.KillJob(js.Job.Id) // we are in the main thread, this function must be called async.
}
// If we have an existing job with this requestor/tag combination, make sure we use its priority level.
// Not an error since we can consider priority to be a suggestion which we'll handle contextually.
if js.Job.Def.Tag == newJobMsg.job.Def.Tag &&
js.Job.Def.Basis != newJobMsg.job.Def.Basis &&
js.Job.Def.Priority != newJobMsg.job.Def.Priority {
log.WithFields(lf).Info("Overriding job priority to match previous requestor/tag priority")
newJobMsg.job.Def.Priority = js.Job.Def.Priority
}
}
} else if newJobMsg.job.Def.Priority > MaxPriority {
// Priorities greater than 2 are disabled in job_state.go.
jd := newJobMsg.job.Def
log.Infof("Overriding job priority %d to respect max priority of %d (higher priority is untested and disabled)"+
"Requestor:%s, JobType:%s, Tag:%s, Basis:%s, Priority:%d, numTasks: %d",
jd.Priority, MaxPriority, jd.Requestor, jd.JobType, jd.Tag, jd.Basis, jd.Priority, len(jd.Tasks))
newJobMsg.job.Def.Priority = MaxPriority
}
reqToClassMap, _ := s.GetRequestorToClassMap()
jc := GetRequestorClass(newJobMsg.job.Def.Requestor, reqToClassMap)
js := newJobState(newJobMsg.job, jc, newJobMsg.saga, s.taskDurations, s.tasksByJobClassAndStartTimeSec, s.durationKeyExtractorFn)
s.inProgressJobs = append(s.inProgressJobs, js)
sort.Sort(sort.Reverse(taskStatesByDuration(js.Tasks)))
req := newJobMsg.job.Def.Requestor
if _, ok := s.requestorMap[req]; !ok {
s.requestorMap[req] = []*jobState{}
}
s.requestorMap[req] = append(s.requestorMap[req], js)
log.WithFields(lf).Info("Created new job")
default:
if receivedJob {
total, completed, running = s.getSchedulerTaskCounts()
log.WithFields(
log.Fields{
"unscheduledTasks": total - completed - running,
"runningTasks": running,
"completedTasks": completed,
"totalTasks": total,
}).Info("Added jobs")
}
return
}
}
}
// Checks if any new jobs have been requested since the last loop and adds
// them to the jobs the scheduler is handling
func (s *statefulScheduler) addJobs() {
defer s.stat.Latency(stats.SchedAddJobsLatency_ms).Time().Stop()
s.checkJobsLoop()
s.addJobsLoop()
}
// Helpers, assumes that jobId is present given a consistent scheduler state.
func (s *statefulScheduler) deleteJob(jobId string) {
var requestor string
for i, job := range s.inProgressJobs {
if job.Job.Id == jobId {
requestor = job.Job.Def.Requestor
s.inProgressJobs = append(s.inProgressJobs[:i], s.inProgressJobs[i+1:]...)
}
}
jobs := s.requestorMap[requestor]
for i, job := range jobs {
if job.Job.Id == jobId {
s.requestorMap[requestor] = append(jobs[:i], jobs[i+1:]...)
}
}
if len(s.requestorMap[requestor]) == 0 {
delete(s.requestorMap, requestor)
}
}
func (s *statefulScheduler) getJob(jobId string) *jobState {
for _, job := range s.inProgressJobs {
if job.Job.Id == jobId {
return job
}
}
return nil
}
// checks if any of the in progress jobs are completed. If a job is
// completed log an EndSaga Message to the SagaLog asynchronously
func (s *statefulScheduler) checkForCompletedJobs() {
defer s.stat.Latency(stats.SchedCheckForCompletedLatency_ms).Time().Stop()
// Check For Completed Jobs & Log EndSaga Message
for _, jobState := range s.inProgressJobs {
if jobState.getJobStatus() == domain.Completed && !jobState.EndingSaga {
// mark job as being completed
jobState.EndingSaga = true
// set up variables for async functions for async function & callbacks
j := jobState
s.asyncRunner.RunAsync(
func() error {
// FIXME: seeing panic on closed channel here after killjob().
return j.Saga.EndSaga()
},
func(err error) {
if err == nil {
log.WithFields(
log.Fields{
"jobID": j.Job.Id,
"requestor": j.Job.Def.Requestor,
"jobType": j.Job.Def.JobType,
"tag": j.Job.Def.Tag,
}).Info("Job completed and logged")
// This job is fully processed remove from InProgressJobs
s.deleteJob(j.Job.Id)
} else {
// set the jobState flag to false, will retry logging
// EndSaga message on next scheduler loop
j.EndingSaga = false
s.stat.Counter(stats.SchedRetriedEndSagaCounter).Inc(1) // TODO errata metric - remove if unused
log.WithFields(
log.Fields{
"jobID": j.Job.Id,
"err": err,
"requestor": j.Job.Def.Requestor,
"jobType": j.Job.Def.JobType,
"tag": j.Job.Def.Tag,
}).Info("Job completed but failed to log")
}
})
}
}
}
// figures out which tasks to schedule next and on which worker and then runs them
func (s *statefulScheduler) scheduleTasks() {
// Calculate a list of Tasks to Node Assignments & start running all those jobs
// Pass nil config so taskScheduler can determine the most appropriate values itself.
defer s.stat.Latency(stats.SchedScheduleTasksLatency_ms).Time().Stop()
taskAssignments := s.getTaskAssignments()
for _, ta := range taskAssignments {
// Set up variables for async functions & callback
task := ta.task
nodeSt := ta.nodeSt
jobID := task.JobId
taskID := task.TaskId
requestor := s.getJob(jobID).Job.Def.Requestor
jobType := s.getJob(jobID).Job.Def.JobType
tag := s.getJob(jobID).Job.Def.Tag
taskDef := task.Def
taskDef.JobID = jobID
taskDef.Tag = tag
jobState := s.getJob(jobID)
sa := jobState.Saga
rs := s.runnerFactory(nodeSt.node)
durationID := s.durationKeyExtractorFn(taskID)
preventRetries := bool(task.NumTimesTried >= s.config.MaxRetriesPerTask)
avgDur := -1
iface, ok := s.taskDurations.Get(durationID)
if ok {
ad, ok := iface.(*averageDuration)
if !ok {
log.Errorf("getting task duration, object was not *averageDuration type! (it is %s)", reflect.TypeOf(ad))
} else {
avgDur = int(ad.duration)
}
}
log.WithFields(
log.Fields{
"jobID": jobID,
"taskID": taskID,
"node": nodeSt.node,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
"taskDef": taskDef,
"durationID": durationID,
"avgDuration": fmt.Sprintf("%d (sec)", avgDur),
}).Info("Starting taskRunner")
tRunner := &taskRunner{
saga: sa,
runner: rs,
stat: s.stat,
defaultTaskTimeout: s.config.DefaultTaskTimeout,
taskTimeoutOverhead: s.config.TaskTimeoutOverhead,
runnerRetryTimeout: s.config.RunnerRetryTimeout,
runnerRetryInterval: s.config.RunnerRetryInterval,
markCompleteOnFailure: preventRetries,
LogTags: tags.LogTags{
JobID: jobID,
TaskID: taskID,
Tag: tag,
},
task: taskDef,
nodeSt: nodeSt,
abortCh: make(chan abortReq, 1),
queryAbortCh: make(chan interface{}, 1),
startTime: time.Now(),
}
// mark the task as started in the jobState and record its taskRunner
jobState.taskStarted(taskID, tRunner)
s.asyncRunner.RunAsync(
tRunner.run,
func(err error) {
defer rs.Release()
// Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first.
if err == nil || err.(*taskError).st.State == runner.TIMEDOUT ||
(err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) {
addOrUpdateTaskDuration(s.taskDurations, durationID, time.Since(tRunner.startTime))
}
// If the node is absent, or was deleted then re-added, then we need to selectively clean up.
// The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup.
// We need the fake value so we don't clobber any new job assignments to that nodeId.
nodeId := nodeSt.node.Id()
nodeStInstance, ok := s.clusterState.getNodeState(nodeId)
nodeAbsent := !ok
nodeReAdded := false
if !nodeAbsent {
nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh)
}
nodeStChanged := nodeAbsent || nodeReAdded
preempted := false
if nodeStChanged {
nodeId = nodeId + ":ERROR"
log.WithFields(
log.Fields{
"node": nodeSt.node,
"jobID": jobID,
"taskID": taskID,
"runningJob": nodeSt.runningJob,
"runningTask": nodeSt.runningTask,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Task *node* lost, cleaning up.")
}
if nodeReAdded {
preempted = true
}
flaky := false
aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED)
if err != nil {
// Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes.
// TODO - we no longer set a node as flaky on failed status.
// In practice, we've observed that this results in checkout failures causing
// nodes to drop out of the cluster and reduce capacity to no benefit.
// A more comprehensive solution would be to overhaul this behavior.
taskErr := err.(*taskError)
flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED)
msg := "Error running job (will be retried):"
if aborted {
msg = "Error running task, but job kill request received, (will not retry):"
err = nil
} else {
if preventRetries {
msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask)
err = nil
} else {
jobState.errorRunningTask(taskID, err, preempted)
}
}
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"err": taskErr,
"cmd": strings.Join(taskDef.Argv, " "),
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info(msg)
// If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds.
if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil {
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
}).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ")
// TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask
go func() {
for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) {
time.Sleep(time.Second)
}
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ")
}()
}
}
if err == nil || aborted {
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"command": strings.Join(taskDef.Argv, " "),
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Ending task.")
jobState.taskCompleted(taskID, true)
}
// update cluster state that this node is now free and if we consider the runner to be flaky.
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"node": nodeSt.node,
"flaky": flaky,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Freeing node, removed job.")
s.clusterState.taskCompleted(nodeId, flaky)
total := 0
completed := 0
running := 0
for _, job := range s.inProgressJobs {
total += len(job.Tasks)
completed += job.TasksCompleted
running += job.TasksRunning
}
log.WithFields(
log.Fields{
"jobId": jobID,
"running": jobState.TasksRunning,
"completed": jobState.TasksCompleted,
"total": len(jobState.Tasks),
"isdone": jobState.TasksCompleted == len(jobState.Tasks),
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info()
log.WithFields(
log.Fields{
"running": running,
"completed": completed,
"total": total,
"alldone": completed == total,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Jobs task summary")
})
}
}
//Put the kill request on channel that is processed by the main
//scheduler loop, and wait for the response
func (s *statefulScheduler) KillJob(jobID string) error {
log.WithFields(
log.Fields{
"jobID": jobID,
}).Info("KillJob requested")
responseCh := make(chan error, 1)
req := jobKillRequest{jobId: jobID, responseCh: responseCh}
s.killJobCh <- req
return <-req.responseCh
}
func (s *statefulScheduler) GetSagaCoord() saga.SagaCoordinator {
return s.sagaCoord
}
func (s *statefulScheduler) OfflineWorker(req domain.OfflineWorkerReq) error {
if !stringInSlice(req.Requestor, s.config.Admins) && len(s.config.Admins) != 0 {
return fmt.Errorf("requestor %s unauthorized to offline worker", req.Requestor)
}
log.Infof("Offlining worker %s", req.ID)
n := cc.NodeId(req.ID)
if !s.clusterState.HasOnlineNode(n) {
return fmt.Errorf("node %s was not present in nodes. It can't be offlined", req.ID)
}
s.clusterState.OfflineNode(n)
return nil
}
func (s *statefulScheduler) ReinstateWorker(req domain.ReinstateWorkerReq) error {
if !stringInSlice(req.Requestor, s.config.Admins) && len(s.config.Admins) != 0 {
return fmt.Errorf("requestor %s unauthorized to reinstate worker", req.Requestor)
}
n := cc.NodeId(req.ID)
if !s.clusterState.IsOfflined(n) {
return fmt.Errorf("node %s was not present in offlinedNodes. It can't be reinstated", req.ID)
} else {
log.Infof("Reinstating worker %s", req.ID)
s.clusterState.OnlineNode(n)
return nil
}
}
// process all requests verifying that the jobIds exist: Send errors back
// immediately on the request channel for jobId that don't exist, then
// kill all the jobs with a valid ID
//
// this function is part of the main scheduler loop
func (s *statefulScheduler) killJobs() {
defer s.stat.Latency(stats.SchedKillJobsLatency_ms).Time().Stop()
var validKillRequests []jobKillRequest
// validate jobids and sending invalid ids back and building a list of valid ids
for haveKillRequest := true; haveKillRequest; {
select {
case req := <-s.killJobCh:
// can we find the job?
jobState := s.getJob(req.jobId)
if jobState == nil {
req.responseCh <- fmt.Errorf("cannot kill Job Id %s, not found."+
" (This error can be ignored if kill was a fire-and-forget defensive request)."+
" The job may be finished, "+
" the request may still be in the queue to be scheduled, or "+
" the id may be invalid. "+
" Check the job status, verify the id and/or resubmit the kill request after a few moments.",
req.jobId)
} else if jobState.JobKilled {
req.responseCh <- fmt.Errorf("job Id %s was already killed, request ignored", req.jobId)
} else {
jobState.JobKilled = true
validKillRequests = append(validKillRequests[:], req)
}
default:
haveKillRequest = false
}
}
// kill the jobs with valid ids
s.processKillJobRequests(validKillRequests)
}
func (s *statefulScheduler) processKillJobRequests(reqs []jobKillRequest) {
for _, req := range reqs {
jobState := s.getJob(req.jobId)
logFields := log.Fields{
"jobID": req.jobId,
"requestor": jobState.Job.Def.Requestor,
"jobType": jobState.Job.Def.JobType,
"tag": jobState.Job.Def.Tag,
}
var updateMessages []saga.SagaMessage
for _, task := range s.getJob(req.jobId).Tasks {
msgs := s.abortTask(jobState, task, logFields, UserRequestedErrStr)
updateMessages = append(updateMessages, msgs...)
}
if len(updateMessages) > 0 {
if err := jobState.Saga.BulkMessage(updateMessages); err != nil {
logFields["err"] = err
log.WithFields(logFields).Error("killJobs saga.BulkMessage failure")
}
}
delete(logFields, "err")
delete(logFields, "taskID")
log.WithFields(logFields).Info("killJobs summary")
req.responseCh <- nil
}
}
// abortTask abort a task - will be triggered by killing a job and when the scheduling algorithm rebalances the workers
func (s *statefulScheduler) abortTask(jobState *jobState, task *taskState, logFields log.Fields, abortMsg string) []saga.SagaMessage {
logFields["taskID"] = task.TaskId
log.WithFields(logFields).Info("aborting task")
var updateMessages []saga.SagaMessage
if task.Status == domain.InProgress {
task.TaskRunner.Abort(true, abortMsg)
} else if task.Status == domain.NotStarted {
st := runner.AbortStatus("", tags.LogTags{JobID: jobState.Job.Id, TaskID: task.TaskId})
st.Error = UserRequestedErrStr
statusAsBytes, err := worker.SerializeProcessStatus(st)
if err != nil {
s.stat.Counter(stats.SchedFailedTaskSerializeCounter).Inc(1) // TODO errata metric - remove if unused
}
s.stat.Counter(stats.SchedCompletedTaskCounter).Inc(1)
updateMessages = append(updateMessages, saga.MakeStartTaskMessage(jobState.Saga.ID(), task.TaskId, nil))
updateMessages = append(updateMessages, saga.MakeEndTaskMessage(jobState.Saga.ID(), task.TaskId, statusAsBytes))
jobState.taskCompleted(task.TaskId, false)
}
return updateMessages
}
func getRequestorHistory(requestorHistory *lru.Cache, requestor string) []string {
var history []string
iface, ok := requestorHistory.Get(requestor)
if ok {
history, _ = iface.([]string)
}
return history
}
func addOrUpdateRequestorHistory(requestorHistory *lru.Cache, requestor, newHistory string) {
var history []string
iface, ok := requestorHistory.Get(requestor)
if ok {
history, ok = iface.([]string)
if !ok {
return
}
history = append(history, newHistory)
}
if len(history) > DefaultMaxHistoryTags {
history = history[1:]
}
requestorHistory.Add(requestor, history)
}
func addOrUpdateTaskDuration(taskDurations *lru.Cache, durationKey string, d time.Duration) {
var ad *averageDuration
iface, ok := taskDurations.Get(durationKey)
if !ok {
ad = &averageDuration{count: 1, duration: d}
taskDurations.Add(durationKey, ad)
} else {
ad, ok = iface.(*averageDuration)
if !ok {
log.Errorf("task duration object was not *averageDuration type! (it is %s)", reflect.TypeOf(ad))
return
}
ad.update(d)
}
}
// set the max schedulable tasks. -1 = unlimited, 0 = don't accept any more requests, >0 = only accept job
// requests when the number of running and waiting tasks won't exceed the limit
func (s *statefulScheduler) SetSchedulerStatus(maxTasks int) error {
err := domain.ValidateMaxTasks(maxTasks)
if err != nil {
return err
}
s.setThrottle(maxTasks)
log.Infof("scheduler throttled to %d", maxTasks)
if err := s.persistSettings(); err != nil {
return fmt.Errorf("throttle setting not persisted, scheduler may use default when restarted. %s", err)
}
return nil
}
// return
// - true/false indicating if the scheduler is accepting job requests
// - the current number of tasks running or waiting to run
// - the max number of tasks the scheduler will handle, -1 -> there is no max number
func (s *statefulScheduler) GetSchedulerStatus() (int, int) {
var total, completed, _ = s.getSchedulerTaskCounts()
var task_cnt = total - completed
return task_cnt, s.getThrottle()
}
func (s *statefulScheduler) SetSchedulingAlg(sa SchedulingAlgorithm) {
s.config.SchedAlg = sa
}
// GetClassLoadPercents return a copy of the ClassLoadPercents
func (s *statefulScheduler) GetClassLoadPercents() (map[string]int32, error) {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return nil, fmt.Errorf("not using load based scheduler, no load percents")
}
return sa.getClassLoadPercents(), nil
}
// SetClassLoadPercents set the scheduler's class load pcts with a copy of the input class load pcts
func (s *statefulScheduler) SetClassLoadPercents(classLoadPercents map[string]int32) error {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return fmt.Errorf("not using load based scheduler, class load pcts ignored")
}
sa.setClassLoadPercents(classLoadPercents)
if err := s.persistSettings(); err != nil {
return fmt.Errorf("load percents not persisted, scheduler may use default when restarted. %s", err)
}
return nil
}
// GetRequestorToClassMap return a copy of the RequestorToClassMap
func (s *statefulScheduler) GetRequestorToClassMap() (map[string]string, error) {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return nil, fmt.Errorf("not using load based scheduler, no class map")
}
return sa.getRequestorToClassMap(), nil
}
// SetRequestorToClassMap set the scheduler's requestor to class map with a copy of the input map
func (s *statefulScheduler) SetRequestorToClassMap(requestorToClassMap map[string]string) error {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return fmt.Errorf("not using load based scheduler, requestor to class map ignored")
}
sa.setRequestorToClassMap(requestorToClassMap)
if err := s.persistSettings(); err != nil {
return fmt.Errorf("RequestorToClassMap not persisted, scheduler may use default when restarted. %s", err)
}
return nil
}
// GetRebalanceMinimumDuration
func (s *statefulScheduler) GetRebalanceMinimumDuration() (time.Duration, error) {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return 0, fmt.Errorf("not using load based scheduler, no rebalance min duration")
}
return sa.getRebalanceMinimumDuration(), nil
}
// GetRebalanceMinimumDuration
func (s *statefulScheduler) SetRebalanceMinimumDuration(durationMin time.Duration) error {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return fmt.Errorf("not using load based scheduler, requestor to rebalance min duration ignored")
}
sa.setRebalanceMinimumDuration(durationMin)
if err := s.persistSettings(); err != nil {
return fmt.Errorf("RebalanceMinimumDuration not persisted, scheduler may use default when restarted. %s", err)
}
return nil
}
// GetRebalanceThreshold
func (s *statefulScheduler) GetRebalanceThreshold() (int32, error) {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return 0, fmt.Errorf("not using load based scheduler, no rebalance threshold")
}
return int32(sa.getRebalanceThreshold()), nil
}
// SetRebalanceThreshold
func (s *statefulScheduler) SetRebalanceThreshold(threshold int32) error {
sa, ok := s.config.SchedAlg.(*LoadBasedAlg)
if !ok {
return fmt.Errorf("not using load based scheduler, requestor to rebalance threshold ignored")
}
sa.setRebalanceThreshold(int(threshold))
if err := s.persistSettings(); err != nil {
return fmt.Errorf("RebalanceThreshold not persisted, scheduler may use default when restarted. %s", err)
}
return nil
}
func (s *statefulScheduler) SetPersistor(persistor Persistor) {
s.persistor = persistor
}
func (s *statefulScheduler) getThrottle() int {
s.TaskThrottleMu.RLock()
defer s.TaskThrottleMu.RUnlock()
return s.config.TaskThrottle
}
func (s *statefulScheduler) setThrottle(throttle int) {
s.TaskThrottleMu.Lock()
defer s.TaskThrottleMu.Unlock()
s.config.TaskThrottle = throttle
}