func NewStatefulScheduler()

in scheduler/server/stateful_scheduler.go [222:356]


func NewStatefulScheduler(
	nodesUpdatesCh chan []cc.NodeUpdate,
	sc saga.SagaCoordinator,
	rf RunnerFactory,
	config SchedulerConfiguration,
	stat stats.StatsReceiver,
	persistor Persistor,
	durationKeyExtractorFn func(string) string) *statefulScheduler {
	nodeReadyFn := func(node cc.Node) (bool, time.Duration) {
		run := rf(node)
		st, svc, err := run.StatusAll()
		if err != nil || !svc.Initialized || !svc.IsHealthy {
			if svc.Error != nil {
				log.WithFields(
					log.Fields{
						"node": node,
						"err":  svc.Error,
					}).Info("received service err")
				return false, 0
			}
			return false, config.ReadyFnBackoff
		}
		for _, s := range st {
			log.WithFields(
				log.Fields{
					"node":       node,
					"runID":      s.RunID,
					"state":      s.State,
					"stdout":     s.StdoutRef,
					"stderr":     s.StderrRef,
					"snapshotID": s.SnapshotID,
					"exitCode":   s.ExitCode,
					"error":      s.Error,
					"jobID":      s.JobID,
					"taskID":     s.TaskID,
					"tag":        s.Tag,
				}).Info("Aborting existing run on new node")
			run.Abort(s.RunID)
		}
		return true, 0
	}
	if config.ReadyFnBackoff == 0 {
		nodeReadyFn = nil
	}

	if config.DefaultTaskTimeout == 0 {
		config.DefaultTaskTimeout = DefaultDefaultTaskTimeout
	}
	if config.TaskTimeoutOverhead == 0 {
		config.TaskTimeoutOverhead = DefaultTaskTimeoutOverhead
	}
	if config.MaxRequestors == 0 {
		config.MaxRequestors = DefaultMaxRequestors
	}
	if config.MaxJobsPerRequestor == 0 {
		config.MaxJobsPerRequestor = DefaultMaxJobsPerRequestor
	}
	config.SchedAlgConfig = &LoadBasedAlgConfig{
		stat: stat,
	}

	// create the load base scheduling algorithm
	tasksByClassAndStartMap := map[taskClassAndStartKey]taskStateByJobIDTaskID{}
	sa := NewLoadBasedAlg(config.SchedAlgConfig.(*LoadBasedAlgConfig), tasksByClassAndStartMap)
	sa.setClassLoadPercents(DefaultLoadBasedSchedulerClassPercents)
	sa.setRequestorToClassMap(DefaultRequestorToClassMap)
	sa.setRebalanceMinimumDuration(DefaultMinRebalanceTime)
	config.SchedAlg = sa
	config.SchedAlgConfig = sa.config

	requestorHistory, err := lru.New(DefaultMaxRequestorHistories)
	if err != nil {
		log.Errorf("Failed to create requestorHistory cache: %v", err)
		return nil
	}

	taskDurations, err := lru.New(DefaultMaxTaskDurations)
	if err != nil {
		log.Errorf("Failed to create taskDurations cache: %v", err)
		return nil
	}

	dkef := durationKeyExtractorFn
	if durationKeyExtractorFn == nil {
		dkef = nopDurationKeyExtractor
	}

	sched := &statefulScheduler{
		config:        &config,
		sagaCoord:     sc,
		runnerFactory: rf,
		asyncRunner:   async.NewRunner(),
		checkJobCh:    make(chan jobCheckMsg, 1),
		addJobCh:      make(chan jobAddedMsg, 1),
		killJobCh:     make(chan jobKillRequest, 1), // TODO - what should this value be?
		stepTicker:    time.NewTicker(TickRate),

		clusterState:     newClusterState(nodesUpdatesCh, nodeReadyFn, stat),
		inProgressJobs:   make([]*jobState, 0),
		requestorMap:     make(map[string][]*jobState),
		requestorHistory: requestorHistory,
		taskDurations:    taskDurations,
		stat:             stat,

		tasksByJobClassAndStartTimeSec: tasksByClassAndStartMap,
		persistor:                      persistor,
		durationKeyExtractorFn:         dkef,
	}

	sched.setThrottle(-1)

	if sched.persistor == nil {
		log.Info("setting persistor is nil, settings will reset to default values on scheduler reset")
	}
	sched.loadSettings()

	log.Info(sched)

	if !config.DebugMode {
		// start the scheduler loop
		log.Info("Starting scheduler loop")
		go func() {
			sched.loop()
		}()
	}

	// Recover Jobs in a separate go routine to allow the scheduler
	// to accept new jobs while recovering old ones.
	if config.RecoverJobsOnStartup {
		go func() {
			recoverJobs(sched.sagaCoord, sched.addJobCh)
		}()
	}
	return sched
}