in scheduler/server/task_runner.go [66:158]
func (r *taskRunner) run() error {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"node": r.nodeSt.node,
"task": r.task,
"tag": r.Tag,
}).Info("Starting task")
taskErr := &taskError{}
// Log StartTask Message to SagaLog
if err := r.logTaskStatus(nil, saga.StartTask); err != nil {
taskErr.sagaErr = err
r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
return taskErr
}
// Run and update taskErr with the results.
st, end, err := r.runAndWait()
taskErr.runnerErr = err
taskErr.st = st
// We got a good message back, but it indicates an error. Update taskErr accordingly.
completed := (st.State == runner.COMPLETE)
if err == nil && !completed {
switch st.State {
case runner.FAILED, runner.UNKNOWN:
// runnerErr can be thrift related above, or in this case some other failure that's likely our fault.
err = fmt.Errorf(st.Error)
taskErr.runnerErr = err
default:
// resultErr can be (ABORTED,TIMEDOUT), which indicates a transient or user-related concern.
err = fmt.Errorf(st.State.String())
taskErr.resultErr = err
}
}
// We should write to sagalog if there's no error, or there's an error but the caller won't be retrying.
shouldDeadLetter := (err != nil && (end || r.markCompleteOnFailure))
shouldLog := (err == nil) || shouldDeadLetter
// Update taskErr's Error to indicate we got an empty status back
if taskErr.st.State == runner.UNKNOWN {
taskErr.st.State = runner.FAILED
taskErr.st.Error = emptyStatusError(r.JobID, r.TaskID, err)
}
if shouldDeadLetter {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"err": taskErr,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("Error running job, dead lettering task after max retries.")
taskErr.st.Error += DeadLetterTrailer
}
log.WithFields(
log.Fields{
"node": r.nodeSt.node,
"log": shouldLog,
"runID": taskErr.st.RunID,
"state": taskErr.st.State,
"stdout": taskErr.st.StdoutRef,
"stderr": taskErr.st.StderrRef,
"snapshotID": taskErr.st.SnapshotID,
"exitCode": taskErr.st.ExitCode,
"error": taskErr.st.Error,
"jobID": taskErr.st.JobID,
"taskID": taskErr.st.TaskID,
"tag": taskErr.st.Tag,
"err": taskErr,
}).Info("End task")
if !shouldLog {
if taskErr != nil {
r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
}
return taskErr
}
err = r.logTaskStatus(&taskErr.st, saga.EndTask)
taskErr.sagaErr = err
if taskErr.sagaErr == nil && taskErr.runnerErr == nil && taskErr.resultErr == nil {
r.stat.Counter(stats.SchedCompletedTaskCounter).Inc(1)
return nil
} else {
r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
return taskErr
}
}