scheduler/server/task_runner.go (314 lines of code) (raw):
package server
import (
"fmt"
"time"
log "github.com/sirupsen/logrus"
"github.com/twitter/scoot/common/log/tags"
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/runner"
"github.com/twitter/scoot/saga"
"github.com/twitter/scoot/scheduler/domain"
worker "github.com/twitter/scoot/worker/domain"
)
const DeadLetterTrailer = " -> Error(s) encountered, canceling task."
func emptyStatusError(jobId string, taskId string, err error) string {
return fmt.Sprintf("Empty run status, jobId: %s, taskId: %s, err: %v", jobId, taskId, err)
}
type abortReq struct {
endTask bool
err string
}
type taskRunner struct {
saga *saga.Saga
runner runner.Service
stat stats.StatsReceiver
markCompleteOnFailure bool
taskTimeoutOverhead time.Duration // How long to wait for a response after the task has timed out.
defaultTaskTimeout time.Duration // Use this timeout as the default for any cmds that don't have one.
runnerRetryTimeout time.Duration // How long to keep retrying a runner req
runnerRetryInterval time.Duration // How long to sleep between runner req retries.
tags.LogTags
task domain.TaskDefinition
nodeSt *nodeState
abortCh chan abortReq // Primary channel to check for aborts
queryAbortCh chan interface{} // Secondary channel to pass to blocking query.
startTime time.Time
}
// Return a custom error from run() so the scheduler has more context.
type taskError struct {
sagaErr error
runnerErr error
resultErr error // Note: resultErr is the error from trying to get the results of the command, not an error from the command
st runner.RunStatus
}
func (t *taskError) Error() string {
return fmt.Sprintf("TaskError: saga: %v ### runner: %v ### result: %v, state:%s", t.sagaErr, t.runnerErr, t.resultErr, t.st.State)
}
// Run the task on the specified worker, and update the SagaLog appropriately. Returns an error if an
// error occurs trying to run the task, getting the task results or writing to SagaLog. (Note: if the
// task's command errors when the command is run, this is not considered to be an error.)
// This method blocks until all saga messages are logged and the task completes
func (r *taskRunner) run() error {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"node": r.nodeSt.node,
"task": r.task,
"tag": r.Tag,
}).Info("Starting task")
taskErr := &taskError{}
// Log StartTask Message to SagaLog
if err := r.logTaskStatus(nil, saga.StartTask); err != nil {
taskErr.sagaErr = err
r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
return taskErr
}
// Run and update taskErr with the results.
st, end, err := r.runAndWait()
taskErr.runnerErr = err
taskErr.st = st
// We got a good message back, but it indicates an error. Update taskErr accordingly.
completed := (st.State == runner.COMPLETE)
if err == nil && !completed {
switch st.State {
case runner.FAILED, runner.UNKNOWN:
// runnerErr can be thrift related above, or in this case some other failure that's likely our fault.
err = fmt.Errorf(st.Error)
taskErr.runnerErr = err
default:
// resultErr can be (ABORTED,TIMEDOUT), which indicates a transient or user-related concern.
err = fmt.Errorf(st.State.String())
taskErr.resultErr = err
}
}
// We should write to sagalog if there's no error, or there's an error but the caller won't be retrying.
shouldDeadLetter := (err != nil && (end || r.markCompleteOnFailure))
shouldLog := (err == nil) || shouldDeadLetter
// Update taskErr's Error to indicate we got an empty status back
if taskErr.st.State == runner.UNKNOWN {
taskErr.st.State = runner.FAILED
taskErr.st.Error = emptyStatusError(r.JobID, r.TaskID, err)
}
if shouldDeadLetter {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"err": taskErr,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("Error running job, dead lettering task after max retries.")
taskErr.st.Error += DeadLetterTrailer
}
log.WithFields(
log.Fields{
"node": r.nodeSt.node,
"log": shouldLog,
"runID": taskErr.st.RunID,
"state": taskErr.st.State,
"stdout": taskErr.st.StdoutRef,
"stderr": taskErr.st.StderrRef,
"snapshotID": taskErr.st.SnapshotID,
"exitCode": taskErr.st.ExitCode,
"error": taskErr.st.Error,
"jobID": taskErr.st.JobID,
"taskID": taskErr.st.TaskID,
"tag": taskErr.st.Tag,
"err": taskErr,
}).Info("End task")
if !shouldLog {
if taskErr != nil {
r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
}
return taskErr
}
err = r.logTaskStatus(&taskErr.st, saga.EndTask)
taskErr.sagaErr = err
if taskErr.sagaErr == nil && taskErr.runnerErr == nil && taskErr.resultErr == nil {
r.stat.Counter(stats.SchedCompletedTaskCounter).Inc(1)
return nil
} else {
r.stat.Counter(stats.SchedFailedTaskCounter).Inc(1)
return taskErr
}
}
// Run cmd and if there's a runner error (ex: thrift) re-run/re-query until completion, retry timeout, or cmd timeout.
func (r *taskRunner) runAndWait() (runner.RunStatus, bool, error) {
cmd := &r.task.Command
if cmd.Timeout == 0 {
cmd.Timeout = r.defaultTaskTimeout
}
cmdEndTime := time.Now().Add(cmd.Timeout).Add(r.taskTimeoutOverhead)
elapsedRetryDuration := time.Duration(0)
var st runner.RunStatus
var err error
var id runner.RunID
var end bool
cmd.TaskID = r.TaskID
cmd.JobID = r.JobID
// If runner call returns an error then we treat it as an infrastructure error and will repeatedly retry.
// If runner call returns a result indicating cmd error we fail and return.
//TODO(jschiller): add a Nonce to Cmd so worker knows what to do if it sees a dup command?
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("runAndWait()")
for {
// was a job kill request received before we could start the run?
if aborted, req := r.abortRequested(); aborted {
st = runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
st.Error = req.err
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("The run was aborted by the scheduler before it was sent to a worker")
return st, req.endTask, nil
}
// send the command to the worker
st, err = r.runner.Run(cmd)
// was a job kill request received while starting the run?
if aborted, req := r.abortRequested(); aborted {
if err == nil { // we should have a status with runId, abort the run
r.runner.Abort(st.RunID)
}
st = runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
st.Error = req.err
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Info("Initial run attempts aborted by the scheduler")
return st, req.endTask, nil
}
if err != nil && elapsedRetryDuration+r.runnerRetryInterval < r.runnerRetryTimeout {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
"err": err,
}).Info("Retrying run()")
r.stat.Counter(stats.SchedTaskStartRetries).Inc(1)
time.Sleep(r.runnerRetryInterval)
elapsedRetryDuration += r.runnerRetryInterval
continue
} else if err != nil || st.State.IsDone() {
return st, false, err
}
break
}
id = st.RunID
// Wait for the process to start running, log it, then wait for it to finish.
elapsedRetryDuration = 0
includeRunning := true
for {
st, end, err = r.queryWithTimeout(id, cmdEndTime, includeRunning)
elapsed := elapsedRetryDuration + r.runnerRetryInterval
if (err != nil && elapsed >= r.runnerRetryTimeout) || (err == nil && st.State.IsDone()) {
if st.State != runner.COMPLETE {
r.runner.Abort(id)
}
break
} else if err != nil {
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"tag": r.Tag,
"node": r.nodeSt.node,
"err": err,
}).Info("Retrying query")
time.Sleep(r.runnerRetryInterval)
elapsedRetryDuration += r.runnerRetryInterval
continue
} else if includeRunning {
// It's running, but not done, so we want to log a second StartTask that includes
// its status, so a watcher can go investigate. Strictly speaking this is optional
// in that we've already logged a start task and our only obligation is to log a
// corresponding end task.
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"node": r.nodeSt.node,
"runStatus": st,
"tag": r.Tag,
}).Debug("Update task")
r.logTaskStatus(&st, saga.StartTask)
includeRunning = false
}
}
return st, end, err
}
func (r *taskRunner) queryWithTimeout(id runner.RunID, endTime time.Time, includeRunning bool) (runner.RunStatus, bool, error) {
// setup the query request
q := runner.Query{Runs: []runner.RunID{id}, States: runner.DONE_MASK}
if includeRunning {
q.States = q.States | runner.RUNNING_MASK
}
timeout := endTime.Sub(time.Now())
if timeout < 0 {
timeout = 0
}
// The semantics of timeout changes here. Before, zero meant use the default, here it means return immediately.
w := runner.Wait{Timeout: timeout, AbortCh: r.queryAbortCh}
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"timeout": timeout,
"tag": r.Tag,
"node": r.nodeSt.node,
}).Infof("Query(includeRunning=%t)", includeRunning)
// issue a query that blocks till get a response, w's timeout, or abort (from job kill)
// if the abort request triggers the Query() to return, Query() will put a new
// abort request on the channel to replace the one it consumed, so we know to send
// an abort to the runner below
sts, _, err := r.runner.Query(q, w)
if aborted, req := r.abortRequested(); aborted {
st := runner.AbortStatus(id, tags.LogTags{JobID: r.JobID, TaskID: r.TaskID})
st.Error = req.err
return st, req.endTask, nil
}
if err != nil {
return runner.RunStatus{}, false, err
}
var st runner.RunStatus
if len(sts) == 1 {
st = sts[0]
} else {
st = runner.RunStatus{
RunID: id,
State: runner.TIMEDOUT,
}
}
return st, false, nil
}
func (r *taskRunner) logTaskStatus(st *runner.RunStatus, msgType saga.SagaMessageType) error {
log.WithFields(
log.Fields{
"msgType": msgType,
"jobID": r.JobID,
"taskID": r.TaskID,
"node": r.nodeSt.node,
"tag": r.Tag,
}).Info("TryLogTaskStatus")
var statusAsBytes []byte
var err error
if st != nil {
statusAsBytes, err = worker.SerializeProcessStatus(*st)
if err != nil {
r.stat.Counter(stats.SchedFailedTaskSerializeCounter).Inc(1) // TODO errata metric - remove if unused
return err
}
}
switch msgType {
case saga.StartTask:
err = r.saga.StartTask(r.TaskID, statusAsBytes)
case saga.EndTask:
err = r.saga.EndTask(r.TaskID, statusAsBytes)
default:
err = fmt.Errorf("unexpected saga message type: %v", msgType)
}
return err
}
func (r *taskRunner) abortRequested() (aborted bool, req abortReq) {
select {
case req := <-r.abortCh:
log.WithFields(
log.Fields{
"jobID": r.JobID,
"taskID": r.TaskID,
"node": r.nodeSt.node,
"endTask": req.endTask,
"tag": r.Tag,
}).Info("Abort requested")
return true, req
default:
return false, abortReq{}
}
}
func (r *taskRunner) Abort(endTask bool, err string) {
r.abortCh <- abortReq{endTask, err}
r.queryAbortCh <- nil
}