in scheduler/server/stateful_scheduler.go [685:725]
func (s *statefulScheduler) checkJobsLoop() {
defer s.stat.Latency(stats.SchedCheckJobsLoopLatency_ms).Time().Stop()
for {
select {
case checkJobMsg := <-s.checkJobCh:
var err error
if jobs, ok := s.requestorMap[checkJobMsg.jobDef.Requestor]; !ok && len(s.requestorMap) >= s.config.MaxRequestors {
err = fmt.Errorf("exceeds max number of requestors: %s (%d)", checkJobMsg.jobDef.Requestor, s.config.MaxRequestors)
} else if len(jobs) >= s.config.MaxJobsPerRequestor {
err = fmt.Errorf("exceeds max jobs per requestor: %s (%d)", checkJobMsg.jobDef.Requestor, s.config.MaxJobsPerRequestor)
} else if checkJobMsg.jobDef.Priority < domain.P0 || checkJobMsg.jobDef.Priority > domain.P2 {
err = fmt.Errorf("invalid priority %d, must be between 0-2 inclusive", checkJobMsg.jobDef.Priority)
} else {
// Check for duplicate task names
seenTasks := map[string]bool{}
for _, t := range checkJobMsg.jobDef.Tasks {
if _, ok := seenTasks[t.TaskID]; ok {
err = fmt.Errorf("invalid dup taskID %s", t.TaskID)
break
}
seenTasks[t.TaskID] = true
}
}
if err == nil && checkJobMsg.jobDef.Basis != "" {
// Check if the given tag is expired for the given requestor & basis.
rb := checkJobMsg.jobDef.Requestor + checkJobMsg.jobDef.Basis
rh := getRequestorHistory(s.requestorHistory, rb)
if stringInSlice(checkJobMsg.jobDef.Tag, rh) &&
rh[len(rh)-1] != checkJobMsg.jobDef.Tag {
err = fmt.Errorf("expired tag=%s for basis=%s. Expected either tag=%s or new tag",
checkJobMsg.jobDef.Tag, checkJobMsg.jobDef.Basis, rh[len(rh)-1])
} else {
addOrUpdateRequestorHistory(s.requestorHistory, rb, checkJobMsg.jobDef.Tag)
}
}
checkJobMsg.resultCh <- err
default:
return
}
}
}