in scheduler/server/stateful_scheduler.go [222:356]
func NewStatefulScheduler(
nodesUpdatesCh chan []cc.NodeUpdate,
sc saga.SagaCoordinator,
rf RunnerFactory,
config SchedulerConfiguration,
stat stats.StatsReceiver,
persistor Persistor,
durationKeyExtractorFn func(string) string) *statefulScheduler {
nodeReadyFn := func(node cc.Node) (bool, time.Duration) {
run := rf(node)
st, svc, err := run.StatusAll()
if err != nil || !svc.Initialized || !svc.IsHealthy {
if svc.Error != nil {
log.WithFields(
log.Fields{
"node": node,
"err": svc.Error,
}).Info("received service err")
return false, 0
}
return false, config.ReadyFnBackoff
}
for _, s := range st {
log.WithFields(
log.Fields{
"node": node,
"runID": s.RunID,
"state": s.State,
"stdout": s.StdoutRef,
"stderr": s.StderrRef,
"snapshotID": s.SnapshotID,
"exitCode": s.ExitCode,
"error": s.Error,
"jobID": s.JobID,
"taskID": s.TaskID,
"tag": s.Tag,
}).Info("Aborting existing run on new node")
run.Abort(s.RunID)
}
return true, 0
}
if config.ReadyFnBackoff == 0 {
nodeReadyFn = nil
}
if config.DefaultTaskTimeout == 0 {
config.DefaultTaskTimeout = DefaultDefaultTaskTimeout
}
if config.TaskTimeoutOverhead == 0 {
config.TaskTimeoutOverhead = DefaultTaskTimeoutOverhead
}
if config.MaxRequestors == 0 {
config.MaxRequestors = DefaultMaxRequestors
}
if config.MaxJobsPerRequestor == 0 {
config.MaxJobsPerRequestor = DefaultMaxJobsPerRequestor
}
config.SchedAlgConfig = &LoadBasedAlgConfig{
stat: stat,
}
// create the load base scheduling algorithm
tasksByClassAndStartMap := map[taskClassAndStartKey]taskStateByJobIDTaskID{}
sa := NewLoadBasedAlg(config.SchedAlgConfig.(*LoadBasedAlgConfig), tasksByClassAndStartMap)
sa.setClassLoadPercents(DefaultLoadBasedSchedulerClassPercents)
sa.setRequestorToClassMap(DefaultRequestorToClassMap)
sa.setRebalanceMinimumDuration(DefaultMinRebalanceTime)
config.SchedAlg = sa
config.SchedAlgConfig = sa.config
requestorHistory, err := lru.New(DefaultMaxRequestorHistories)
if err != nil {
log.Errorf("Failed to create requestorHistory cache: %v", err)
return nil
}
taskDurations, err := lru.New(DefaultMaxTaskDurations)
if err != nil {
log.Errorf("Failed to create taskDurations cache: %v", err)
return nil
}
dkef := durationKeyExtractorFn
if durationKeyExtractorFn == nil {
dkef = nopDurationKeyExtractor
}
sched := &statefulScheduler{
config: &config,
sagaCoord: sc,
runnerFactory: rf,
asyncRunner: async.NewRunner(),
checkJobCh: make(chan jobCheckMsg, 1),
addJobCh: make(chan jobAddedMsg, 1),
killJobCh: make(chan jobKillRequest, 1), // TODO - what should this value be?
stepTicker: time.NewTicker(TickRate),
clusterState: newClusterState(nodesUpdatesCh, nodeReadyFn, stat),
inProgressJobs: make([]*jobState, 0),
requestorMap: make(map[string][]*jobState),
requestorHistory: requestorHistory,
taskDurations: taskDurations,
stat: stat,
tasksByJobClassAndStartTimeSec: tasksByClassAndStartMap,
persistor: persistor,
durationKeyExtractorFn: dkef,
}
sched.setThrottle(-1)
if sched.persistor == nil {
log.Info("setting persistor is nil, settings will reset to default values on scheduler reset")
}
sched.loadSettings()
log.Info(sched)
if !config.DebugMode {
// start the scheduler loop
log.Info("Starting scheduler loop")
go func() {
sched.loop()
}()
}
// Recover Jobs in a separate go routine to allow the scheduler
// to accept new jobs while recovering old ones.
if config.RecoverJobsOnStartup {
go func() {
recoverJobs(sched.sagaCoord, sched.addJobCh)
}()
}
return sched
}