func()

in scheduler/server/stateful_scheduler.go [730:807]


func (s *statefulScheduler) addJobsLoop() {
	defer s.stat.Latency(stats.SchedAddJobsLoopLatency_ms).Time().Stop()
	var (
		receivedJob bool
		total       int
		completed   int
		running     int
	)
	for {
		select {
		case newJobMsg := <-s.addJobCh:
			receivedJob = true
			lf := log.Fields{
				"jobID":     newJobMsg.job.Id,
				"requestor": newJobMsg.job.Def.Requestor,
				"jobType":   newJobMsg.job.Def.JobType,
				"tag":       newJobMsg.job.Def.Tag,
				"basis":     newJobMsg.job.Def.Basis,
				"priority":  newJobMsg.job.Def.Priority,
				"numTasks":  len(newJobMsg.job.Def.Tasks),
			}
			if _, ok := s.requestorMap[newJobMsg.job.Def.Requestor]; ok {
				for _, js := range s.requestorMap[newJobMsg.job.Def.Requestor] {
					// Kill any prior jobs that share the same requestor and non-empty basis string,
					// but tag should be different so we don't kill the original jobs when doing retries for example.
					// (retries share the same requestor, basis, and tag values).
					if newJobMsg.job.Def.Basis != "" &&
						js.Job.Def.Basis == newJobMsg.job.Def.Basis &&
						js.Job.Def.Tag != newJobMsg.job.Def.Tag {
						log.WithFields(lf).Infof("Matching basis, killing prevJobID=%s", js.Job.Id)
						go s.KillJob(js.Job.Id) // we are in the main thread, this function must be called async.
					}

					// If we have an existing job with this requestor/tag combination, make sure we use its priority level.
					// Not an error since we can consider priority to be a suggestion which we'll handle contextually.
					if js.Job.Def.Tag == newJobMsg.job.Def.Tag &&
						js.Job.Def.Basis != newJobMsg.job.Def.Basis &&
						js.Job.Def.Priority != newJobMsg.job.Def.Priority {
						log.WithFields(lf).Info("Overriding job priority to match previous requestor/tag priority")
						newJobMsg.job.Def.Priority = js.Job.Def.Priority
					}
				}
			} else if newJobMsg.job.Def.Priority > MaxPriority {
				// Priorities greater than 2 are disabled in job_state.go.
				jd := newJobMsg.job.Def
				log.Infof("Overriding job priority %d to respect max priority of %d (higher priority is untested and disabled)"+
					"Requestor:%s, JobType:%s, Tag:%s, Basis:%s, Priority:%d, numTasks: %d",
					jd.Priority, MaxPriority, jd.Requestor, jd.JobType, jd.Tag, jd.Basis, jd.Priority, len(jd.Tasks))
				newJobMsg.job.Def.Priority = MaxPriority
			}

			reqToClassMap, _ := s.GetRequestorToClassMap()
			jc := GetRequestorClass(newJobMsg.job.Def.Requestor, reqToClassMap)
			js := newJobState(newJobMsg.job, jc, newJobMsg.saga, s.taskDurations, s.tasksByJobClassAndStartTimeSec, s.durationKeyExtractorFn)
			s.inProgressJobs = append(s.inProgressJobs, js)

			sort.Sort(sort.Reverse(taskStatesByDuration(js.Tasks)))
			req := newJobMsg.job.Def.Requestor
			if _, ok := s.requestorMap[req]; !ok {
				s.requestorMap[req] = []*jobState{}
			}
			s.requestorMap[req] = append(s.requestorMap[req], js)
			log.WithFields(lf).Info("Created new job")
		default:
			if receivedJob {
				total, completed, running = s.getSchedulerTaskCounts()
				log.WithFields(
					log.Fields{
						"unscheduledTasks": total - completed - running,
						"runningTasks":     running,
						"completedTasks":   completed,
						"totalTasks":       total,
					}).Info("Added jobs")
			}
			return
		}
	}
}