func()

in scheduler/server/stateful_scheduler.go [895:1127]


func (s *statefulScheduler) scheduleTasks() {
	// Calculate a list of Tasks to Node Assignments & start running all those jobs
	// Pass nil config so taskScheduler can determine the most appropriate values itself.
	defer s.stat.Latency(stats.SchedScheduleTasksLatency_ms).Time().Stop()
	taskAssignments := s.getTaskAssignments()
	for _, ta := range taskAssignments {
		// Set up variables for async functions & callback
		task := ta.task
		nodeSt := ta.nodeSt
		jobID := task.JobId
		taskID := task.TaskId
		requestor := s.getJob(jobID).Job.Def.Requestor
		jobType := s.getJob(jobID).Job.Def.JobType
		tag := s.getJob(jobID).Job.Def.Tag
		taskDef := task.Def
		taskDef.JobID = jobID
		taskDef.Tag = tag
		jobState := s.getJob(jobID)
		sa := jobState.Saga
		rs := s.runnerFactory(nodeSt.node)
		durationID := s.durationKeyExtractorFn(taskID)

		preventRetries := bool(task.NumTimesTried >= s.config.MaxRetriesPerTask)

		avgDur := -1
		iface, ok := s.taskDurations.Get(durationID)
		if ok {
			ad, ok := iface.(*averageDuration)
			if !ok {
				log.Errorf("getting task duration, object was not *averageDuration type!  (it is %s)", reflect.TypeOf(ad))
			} else {
				avgDur = int(ad.duration)
			}
		}

		log.WithFields(
			log.Fields{
				"jobID":       jobID,
				"taskID":      taskID,
				"node":        nodeSt.node,
				"requestor":   requestor,
				"jobType":     jobType,
				"tag":         tag,
				"taskDef":     taskDef,
				"durationID":  durationID,
				"avgDuration": fmt.Sprintf("%d (sec)", avgDur),
			}).Info("Starting taskRunner")

		tRunner := &taskRunner{
			saga:   sa,
			runner: rs,
			stat:   s.stat,

			defaultTaskTimeout:    s.config.DefaultTaskTimeout,
			taskTimeoutOverhead:   s.config.TaskTimeoutOverhead,
			runnerRetryTimeout:    s.config.RunnerRetryTimeout,
			runnerRetryInterval:   s.config.RunnerRetryInterval,
			markCompleteOnFailure: preventRetries,

			LogTags: tags.LogTags{
				JobID:  jobID,
				TaskID: taskID,
				Tag:    tag,
			},

			task:   taskDef,
			nodeSt: nodeSt,

			abortCh:      make(chan abortReq, 1),
			queryAbortCh: make(chan interface{}, 1),

			startTime: time.Now(),
		}

		// mark the task as started in the jobState and record its taskRunner
		jobState.taskStarted(taskID, tRunner)

		s.asyncRunner.RunAsync(
			tRunner.run,
			func(err error) {
				defer rs.Release()
				// Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first.
				if err == nil || err.(*taskError).st.State == runner.TIMEDOUT ||
					(err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) {
					addOrUpdateTaskDuration(s.taskDurations, durationID, time.Since(tRunner.startTime))
				}

				// If the node is absent, or was deleted then re-added, then we need to selectively clean up.
				// The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup.
				// We need the fake value so we don't clobber any new job assignments to that nodeId.
				nodeId := nodeSt.node.Id()
				nodeStInstance, ok := s.clusterState.getNodeState(nodeId)
				nodeAbsent := !ok
				nodeReAdded := false
				if !nodeAbsent {
					nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh)
				}
				nodeStChanged := nodeAbsent || nodeReAdded
				preempted := false

				if nodeStChanged {
					nodeId = nodeId + ":ERROR"
					log.WithFields(
						log.Fields{
							"node":        nodeSt.node,
							"jobID":       jobID,
							"taskID":      taskID,
							"runningJob":  nodeSt.runningJob,
							"runningTask": nodeSt.runningTask,
							"requestor":   requestor,
							"jobType":     jobType,
							"tag":         tag,
						}).Info("Task *node* lost, cleaning up.")
				}
				if nodeReAdded {
					preempted = true
				}

				flaky := false
				aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED)
				if err != nil {
					// Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes.
					// TODO - we no longer set a node as flaky on failed status.
					// In practice, we've observed that this results in checkout failures causing
					// nodes to drop out of the cluster and reduce capacity to no benefit.
					// A more comprehensive solution would be to overhaul this behavior.
					taskErr := err.(*taskError)
					flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED)

					msg := "Error running job (will be retried):"
					if aborted {
						msg = "Error running task, but job kill request received, (will not retry):"
						err = nil
					} else {
						if preventRetries {
							msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask)
							err = nil
						} else {
							jobState.errorRunningTask(taskID, err, preempted)
						}
					}
					log.WithFields(
						log.Fields{
							"jobId":     jobID,
							"taskId":    taskID,
							"err":       taskErr,
							"cmd":       strings.Join(taskDef.Argv, " "),
							"requestor": requestor,
							"jobType":   jobType,
							"tag":       tag,
						}).Info(msg)

					// If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds.
					if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil {
						log.WithFields(
							log.Fields{
								"jobId":  jobID,
								"taskId": taskID,
							}).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ")
						// TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask
						go func() {
							for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) {
								time.Sleep(time.Second)
							}
							log.WithFields(
								log.Fields{
									"jobId":     jobID,
									"taskId":    taskID,
									"requestor": requestor,
									"jobType":   jobType,
									"tag":       tag,
								}).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ")
						}()
					}
				}
				if err == nil || aborted {
					log.WithFields(
						log.Fields{
							"jobId":     jobID,
							"taskId":    taskID,
							"command":   strings.Join(taskDef.Argv, " "),
							"requestor": requestor,
							"jobType":   jobType,
							"tag":       tag,
						}).Info("Ending task.")
					jobState.taskCompleted(taskID, true)
				}

				// update cluster state that this node is now free and if we consider the runner to be flaky.
				log.WithFields(
					log.Fields{
						"jobId":     jobID,
						"taskId":    taskID,
						"node":      nodeSt.node,
						"flaky":     flaky,
						"requestor": requestor,
						"jobType":   jobType,
						"tag":       tag,
					}).Info("Freeing node, removed job.")
				s.clusterState.taskCompleted(nodeId, flaky)

				total := 0
				completed := 0
				running := 0
				for _, job := range s.inProgressJobs {
					total += len(job.Tasks)
					completed += job.TasksCompleted
					running += job.TasksRunning
				}
				log.WithFields(
					log.Fields{
						"jobId":     jobID,
						"running":   jobState.TasksRunning,
						"completed": jobState.TasksCompleted,
						"total":     len(jobState.Tasks),
						"isdone":    jobState.TasksCompleted == len(jobState.Tasks),
						"requestor": requestor,
						"jobType":   jobType,
						"tag":       tag,
					}).Info()
				log.WithFields(
					log.Fields{
						"running":   running,
						"completed": completed,
						"total":     total,
						"alldone":   completed == total,
						"requestor": requestor,
						"jobType":   jobType,
						"tag":       tag,
					}).Info("Jobs task summary")
			})
	}
}