in scheduler/server/stateful_scheduler.go [730:807]
func (s *statefulScheduler) addJobsLoop() {
defer s.stat.Latency(stats.SchedAddJobsLoopLatency_ms).Time().Stop()
var (
receivedJob bool
total int
completed int
running int
)
for {
select {
case newJobMsg := <-s.addJobCh:
receivedJob = true
lf := log.Fields{
"jobID": newJobMsg.job.Id,
"requestor": newJobMsg.job.Def.Requestor,
"jobType": newJobMsg.job.Def.JobType,
"tag": newJobMsg.job.Def.Tag,
"basis": newJobMsg.job.Def.Basis,
"priority": newJobMsg.job.Def.Priority,
"numTasks": len(newJobMsg.job.Def.Tasks),
}
if _, ok := s.requestorMap[newJobMsg.job.Def.Requestor]; ok {
for _, js := range s.requestorMap[newJobMsg.job.Def.Requestor] {
// Kill any prior jobs that share the same requestor and non-empty basis string,
// but tag should be different so we don't kill the original jobs when doing retries for example.
// (retries share the same requestor, basis, and tag values).
if newJobMsg.job.Def.Basis != "" &&
js.Job.Def.Basis == newJobMsg.job.Def.Basis &&
js.Job.Def.Tag != newJobMsg.job.Def.Tag {
log.WithFields(lf).Infof("Matching basis, killing prevJobID=%s", js.Job.Id)
go s.KillJob(js.Job.Id) // we are in the main thread, this function must be called async.
}
// If we have an existing job with this requestor/tag combination, make sure we use its priority level.
// Not an error since we can consider priority to be a suggestion which we'll handle contextually.
if js.Job.Def.Tag == newJobMsg.job.Def.Tag &&
js.Job.Def.Basis != newJobMsg.job.Def.Basis &&
js.Job.Def.Priority != newJobMsg.job.Def.Priority {
log.WithFields(lf).Info("Overriding job priority to match previous requestor/tag priority")
newJobMsg.job.Def.Priority = js.Job.Def.Priority
}
}
} else if newJobMsg.job.Def.Priority > MaxPriority {
// Priorities greater than 2 are disabled in job_state.go.
jd := newJobMsg.job.Def
log.Infof("Overriding job priority %d to respect max priority of %d (higher priority is untested and disabled)"+
"Requestor:%s, JobType:%s, Tag:%s, Basis:%s, Priority:%d, numTasks: %d",
jd.Priority, MaxPriority, jd.Requestor, jd.JobType, jd.Tag, jd.Basis, jd.Priority, len(jd.Tasks))
newJobMsg.job.Def.Priority = MaxPriority
}
reqToClassMap, _ := s.GetRequestorToClassMap()
jc := GetRequestorClass(newJobMsg.job.Def.Requestor, reqToClassMap)
js := newJobState(newJobMsg.job, jc, newJobMsg.saga, s.taskDurations, s.tasksByJobClassAndStartTimeSec, s.durationKeyExtractorFn)
s.inProgressJobs = append(s.inProgressJobs, js)
sort.Sort(sort.Reverse(taskStatesByDuration(js.Tasks)))
req := newJobMsg.job.Def.Requestor
if _, ok := s.requestorMap[req]; !ok {
s.requestorMap[req] = []*jobState{}
}
s.requestorMap[req] = append(s.requestorMap[req], js)
log.WithFields(lf).Info("Created new job")
default:
if receivedJob {
total, completed, running = s.getSchedulerTaskCounts()
log.WithFields(
log.Fields{
"unscheduledTasks": total - completed - running,
"runningTasks": running,
"completedTasks": completed,
"totalTasks": total,
}).Info("Added jobs")
}
return
}
}
}