func()

in scheduler/server/task_runner.go [161:282]


func (r *taskRunner) runAndWait() (runner.RunStatus, bool, error) {
	cmd := &r.task.Command
	if cmd.Timeout == 0 {
		cmd.Timeout = r.defaultTaskTimeout
	}
	cmdEndTime := time.Now().Add(cmd.Timeout).Add(r.taskTimeoutOverhead)
	elapsedRetryDuration := time.Duration(0)
	var st runner.RunStatus
	var err error
	var id runner.RunID
	var end bool
	cmd.TaskID = r.TaskID
	cmd.JobID = r.JobID
	// If runner call returns an error then we treat it as an infrastructure error and will repeatedly retry.
	// If runner call returns a result indicating cmd error we fail and return.
	//TODO(jschiller): add a Nonce to Cmd so worker knows what to do if it sees a dup command?
	log.WithFields(
		log.Fields{
			"jobID":  r.JobID,
			"taskID": r.TaskID,
			"tag":    r.Tag,
			"node":   r.nodeSt.node,
		}).Info("runAndWait()")

	for {
		// was a job kill request received before we could start the run?
		if aborted, req := r.abortRequested(); aborted {
			st = runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
			st.Error = req.err
			log.WithFields(
				log.Fields{
					"jobID":  r.JobID,
					"taskID": r.TaskID,
					"tag":    r.Tag,
					"node":   r.nodeSt.node,
				}).Info("The run was aborted by the scheduler before it was sent to a worker")
			return st, req.endTask, nil
		}

		// send the command to the worker
		st, err = r.runner.Run(cmd)

		// was a job kill request received while starting the run?
		if aborted, req := r.abortRequested(); aborted {
			if err == nil { // we should have a status with runId, abort the run
				r.runner.Abort(st.RunID)
			}
			st = runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
			st.Error = req.err
			log.WithFields(
				log.Fields{
					"jobID":  r.JobID,
					"taskID": r.TaskID,
					"tag":    r.Tag,
					"node":   r.nodeSt.node,
				}).Info("Initial run attempts aborted by the scheduler")
			return st, req.endTask, nil
		}

		if err != nil && elapsedRetryDuration+r.runnerRetryInterval < r.runnerRetryTimeout {
			log.WithFields(
				log.Fields{
					"jobID":  r.JobID,
					"taskID": r.TaskID,
					"tag":    r.Tag,
					"node":   r.nodeSt.node,
					"err":    err,
				}).Info("Retrying run()")
			r.stat.Counter(stats.SchedTaskStartRetries).Inc(1)
			time.Sleep(r.runnerRetryInterval)
			elapsedRetryDuration += r.runnerRetryInterval
			continue
		} else if err != nil || st.State.IsDone() {
			return st, false, err
		}
		break
	}
	id = st.RunID

	// Wait for the process to start running, log it, then wait for it to finish.
	elapsedRetryDuration = 0
	includeRunning := true
	for {
		st, end, err = r.queryWithTimeout(id, cmdEndTime, includeRunning)
		elapsed := elapsedRetryDuration + r.runnerRetryInterval
		if (err != nil && elapsed >= r.runnerRetryTimeout) || (err == nil && st.State.IsDone()) {
			if st.State != runner.COMPLETE {
				r.runner.Abort(id)
			}
			break
		} else if err != nil {
			log.WithFields(
				log.Fields{
					"jobID":  r.JobID,
					"taskID": r.TaskID,
					"tag":    r.Tag,
					"node":   r.nodeSt.node,
					"err":    err,
				}).Info("Retrying query")
			time.Sleep(r.runnerRetryInterval)
			elapsedRetryDuration += r.runnerRetryInterval
			continue
		} else if includeRunning {
			// It's running, but not done, so we want to log a second StartTask that includes
			// its status, so a watcher can go investigate. Strictly speaking this is optional
			// in that we've already logged a start task and our only obligation is to log a
			// corresponding end task.
			log.WithFields(
				log.Fields{
					"jobID":     r.JobID,
					"taskID":    r.TaskID,
					"node":      r.nodeSt.node,
					"runStatus": st,
					"tag":       r.Tag,
				}).Debug("Update task")
			r.logTaskStatus(&st, saga.StartTask)
			includeRunning = false
		}
	}

	return st, end, err
}