func()

in scheduler/server/stateful_scheduler.go [685:725]


func (s *statefulScheduler) checkJobsLoop() {
	defer s.stat.Latency(stats.SchedCheckJobsLoopLatency_ms).Time().Stop()
	for {
		select {
		case checkJobMsg := <-s.checkJobCh:
			var err error
			if jobs, ok := s.requestorMap[checkJobMsg.jobDef.Requestor]; !ok && len(s.requestorMap) >= s.config.MaxRequestors {
				err = fmt.Errorf("exceeds max number of requestors: %s (%d)", checkJobMsg.jobDef.Requestor, s.config.MaxRequestors)
			} else if len(jobs) >= s.config.MaxJobsPerRequestor {
				err = fmt.Errorf("exceeds max jobs per requestor: %s (%d)", checkJobMsg.jobDef.Requestor, s.config.MaxJobsPerRequestor)
			} else if checkJobMsg.jobDef.Priority < domain.P0 || checkJobMsg.jobDef.Priority > domain.P2 {
				err = fmt.Errorf("invalid priority %d, must be between 0-2 inclusive", checkJobMsg.jobDef.Priority)
			} else {
				// Check for duplicate task names
				seenTasks := map[string]bool{}
				for _, t := range checkJobMsg.jobDef.Tasks {
					if _, ok := seenTasks[t.TaskID]; ok {
						err = fmt.Errorf("invalid dup taskID %s", t.TaskID)
						break
					}
					seenTasks[t.TaskID] = true
				}
			}
			if err == nil && checkJobMsg.jobDef.Basis != "" {
				// Check if the given tag is expired for the given requestor & basis.
				rb := checkJobMsg.jobDef.Requestor + checkJobMsg.jobDef.Basis
				rh := getRequestorHistory(s.requestorHistory, rb)
				if stringInSlice(checkJobMsg.jobDef.Tag, rh) &&
					rh[len(rh)-1] != checkJobMsg.jobDef.Tag {
					err = fmt.Errorf("expired tag=%s for basis=%s. Expected either tag=%s or new tag",
						checkJobMsg.jobDef.Tag, checkJobMsg.jobDef.Basis, rh[len(rh)-1])
				} else {
					addOrUpdateRequestorHistory(s.requestorHistory, rb, checkJobMsg.jobDef.Tag)
				}
			}
			checkJobMsg.resultCh <- err
		default:
			return
		}
	}
}