in scheduler/server/task_runner.go [161:282]
func (r *taskRunner) runAndWait() (runner.RunStatus, bool, error) {
cmd := &r.task.Command
if cmd.Timeout == 0 {
cmd.Timeout = r.defaultTaskTimeout
}
cmdEndTime := time.Now().Add(cmd.Timeout).Add(r.taskTimeoutOverhead)
elapsedRetryDuration := time.Duration(0)
var st runner.RunStatus
var err error
var id runner.RunID
var end bool
cmd.TaskID = r.TaskID
cmd.JobID = r.JobID
// If runner call returns an error then we treat it as an infrastructure error and will repeatedly retry.
// If runner call returns a result indicating cmd error we fail and return.
//TODO(jschiller): add a Nonce to Cmd so worker knows what to do if it sees a dup command?
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("runAndWait()")
for {
// was a job kill request received before we could start the run?
if aborted, req := r.abortRequested(); aborted {
st = runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
st.Error = req.err
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("The run was aborted by the scheduler before it was sent to a worker")
return st, req.endTask, nil
}
// send the command to the worker
st, err = r.runner.Run(cmd)
// was a job kill request received while starting the run?
if aborted, req := r.abortRequested(); aborted {
if err == nil { // we should have a status with runId, abort the run
r.runner.Abort(st.RunID)
}
st = runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
st.Error = req.err
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("Initial run attempts aborted by the scheduler")
return st, req.endTask, nil
}
if err != nil && elapsedRetryDuration+r.runnerRetryInterval < r.runnerRetryTimeout {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
"err": err,
}).Info("Retrying run()")
r.stat.Counter(stats.SchedTaskStartRetries).Inc(1)
time.Sleep(r.runnerRetryInterval)
elapsedRetryDuration += r.runnerRetryInterval
continue
} else if err != nil || st.State.IsDone() {
return st, false, err
}
break
}
id = st.RunID
// Wait for the process to start running, log it, then wait for it to finish.
elapsedRetryDuration = 0
includeRunning := true
for {
st, end, err = r.queryWithTimeout(id, cmdEndTime, includeRunning)
elapsed := elapsedRetryDuration + r.runnerRetryInterval
if (err != nil && elapsed >= r.runnerRetryTimeout) || (err == nil && st.State.IsDone()) {
if st.State != runner.COMPLETE {
r.runner.Abort(id)
}
break
} else if err != nil {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
"err": err,
}).Info("Retrying query")
time.Sleep(r.runnerRetryInterval)
elapsedRetryDuration += r.runnerRetryInterval
continue
} else if includeRunning {
// It's running, but not done, so we want to log a second StartTask that includes
// its status, so a watcher can go investigate. Strictly speaking this is optional
// in that we've already logged a start task and our only obligation is to log a
// corresponding end task.
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"node": r.nodeSt.node,
"runStatus": st,
"tag": r.Tag,
}).Debug("Update task")
r.logTaskStatus(&st, saga.StartTask)
includeRunning = false
}
}
return st, end, err
}