func()

in scheduler/server/task_runner.go [66:158]


func (r *taskRunner) run() error {
	log.WithFields(
		log.Fields{
			"jobID":  r.JobID,
			"taskID": r.TaskID,
			"node":   r.nodeSt.node,
			"task":   r.task,
			"tag":    r.Tag,
		}).Info("Starting task")
	taskErr := &taskError{}

	// Log StartTask Message to SagaLog
	if err := r.logTaskStatus(nil, saga.StartTask); err != nil {
		taskErr.sagaErr = err
		r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
		return taskErr
	}

	// Run and update taskErr with the results.
	st, end, err := r.runAndWait()
	taskErr.runnerErr = err
	taskErr.st = st

	// We got a good message back, but it indicates an error. Update taskErr accordingly.
	completed := (st.State == runner.COMPLETE)
	if err == nil && !completed {
		switch st.State {
		case runner.FAILED, runner.UNKNOWN:
			// runnerErr can be thrift related above, or in this case some other failure that's likely our fault.
			err = fmt.Errorf(st.Error)
			taskErr.runnerErr = err
		default:
			// resultErr can be (ABORTED,TIMEDOUT), which indicates a transient or user-related concern.
			err = fmt.Errorf(st.State.String())
			taskErr.resultErr = err
		}
	}

	// We should write to sagalog if there's no error, or there's an error but the caller won't be retrying.
	shouldDeadLetter := (err != nil && (end || r.markCompleteOnFailure))
	shouldLog := (err == nil) || shouldDeadLetter

	// Update taskErr's Error to indicate we got an empty status back
	if taskErr.st.State == runner.UNKNOWN {
		taskErr.st.State = runner.FAILED
		taskErr.st.Error = emptyStatusError(r.JobID, r.TaskID, err)
	}
	if shouldDeadLetter {
		log.WithFields(
			log.Fields{
				"jobID":  r.JobID,
				"taskID": r.TaskID,
				"err":    taskErr,
				"tag":    r.Tag,
				"node":   r.nodeSt.node,
			}).Info("Error running job, dead lettering task after max retries.")
		taskErr.st.Error += DeadLetterTrailer
	}

	log.WithFields(
		log.Fields{
			"node":       r.nodeSt.node,
			"log":        shouldLog,
			"runID":      taskErr.st.RunID,
			"state":      taskErr.st.State,
			"stdout":     taskErr.st.StdoutRef,
			"stderr":     taskErr.st.StderrRef,
			"snapshotID": taskErr.st.SnapshotID,
			"exitCode":   taskErr.st.ExitCode,
			"error":      taskErr.st.Error,
			"jobID":      taskErr.st.JobID,
			"taskID":     taskErr.st.TaskID,
			"tag":        taskErr.st.Tag,
			"err":        taskErr,
		}).Info("End task")

	if !shouldLog {
		if taskErr != nil {
			r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
		}
		return taskErr
	}

	err = r.logTaskStatus(&taskErr.st, saga.EndTask)
	taskErr.sagaErr = err
	if taskErr.sagaErr == nil && taskErr.runnerErr == nil && taskErr.resultErr == nil {
		r.stat.Counter(stats.SchedCompletedTaskCounter).Inc(1)
		return nil
	} else {
		r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
		return taskErr
	}
}