in scheduler/server/stateful_scheduler.go [895:1127]
func (s *statefulScheduler) scheduleTasks() {
// Calculate a list of Tasks to Node Assignments & start running all those jobs
// Pass nil config so taskScheduler can determine the most appropriate values itself.
defer s.stat.Latency(stats.SchedScheduleTasksLatency_ms).Time().Stop()
taskAssignments := s.getTaskAssignments()
for _, ta := range taskAssignments {
// Set up variables for async functions & callback
task := ta.task
nodeSt := ta.nodeSt
jobID := task.JobId
taskID := task.TaskId
requestor := s.getJob(jobID).Job.Def.Requestor
jobType := s.getJob(jobID).Job.Def.JobType
tag := s.getJob(jobID).Job.Def.Tag
taskDef := task.Def
taskDef.JobID = jobID
taskDef.Tag = tag
jobState := s.getJob(jobID)
sa := jobState.Saga
rs := s.runnerFactory(nodeSt.node)
durationID := s.durationKeyExtractorFn(taskID)
preventRetries := bool(task.NumTimesTried >= s.config.MaxRetriesPerTask)
avgDur := -1
iface, ok := s.taskDurations.Get(durationID)
if ok {
ad, ok := iface.(*averageDuration)
if !ok {
log.Errorf("getting task duration, object was not *averageDuration type! (it is %s)", reflect.TypeOf(ad))
} else {
avgDur = int(ad.duration)
}
}
log.WithFields(
log.Fields{
"jobID": jobID,
"taskID": taskID,
"node": nodeSt.node,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
"taskDef": taskDef,
"durationID": durationID,
"avgDuration": fmt.Sprintf("%d (sec)", avgDur),
}).Info("Starting taskRunner")
tRunner := &taskRunner{
saga: sa,
runner: rs,
stat: s.stat,
defaultTaskTimeout: s.config.DefaultTaskTimeout,
taskTimeoutOverhead: s.config.TaskTimeoutOverhead,
runnerRetryTimeout: s.config.RunnerRetryTimeout,
runnerRetryInterval: s.config.RunnerRetryInterval,
markCompleteOnFailure: preventRetries,
LogTags: tags.LogTags{
JobID: jobID,
TaskID: taskID,
Tag: tag,
},
task: taskDef,
nodeSt: nodeSt,
abortCh: make(chan abortReq, 1),
queryAbortCh: make(chan interface{}, 1),
startTime: time.Now(),
}
// mark the task as started in the jobState and record its taskRunner
jobState.taskStarted(taskID, tRunner)
s.asyncRunner.RunAsync(
tRunner.run,
func(err error) {
defer rs.Release()
// Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first.
if err == nil || err.(*taskError).st.State == runner.TIMEDOUT ||
(err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) {
addOrUpdateTaskDuration(s.taskDurations, durationID, time.Since(tRunner.startTime))
}
// If the node is absent, or was deleted then re-added, then we need to selectively clean up.
// The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup.
// We need the fake value so we don't clobber any new job assignments to that nodeId.
nodeId := nodeSt.node.Id()
nodeStInstance, ok := s.clusterState.getNodeState(nodeId)
nodeAbsent := !ok
nodeReAdded := false
if !nodeAbsent {
nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh)
}
nodeStChanged := nodeAbsent || nodeReAdded
preempted := false
if nodeStChanged {
nodeId = nodeId + ":ERROR"
log.WithFields(
log.Fields{
"node": nodeSt.node,
"jobID": jobID,
"taskID": taskID,
"runningJob": nodeSt.runningJob,
"runningTask": nodeSt.runningTask,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Task *node* lost, cleaning up.")
}
if nodeReAdded {
preempted = true
}
flaky := false
aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED)
if err != nil {
// Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes.
// TODO - we no longer set a node as flaky on failed status.
// In practice, we've observed that this results in checkout failures causing
// nodes to drop out of the cluster and reduce capacity to no benefit.
// A more comprehensive solution would be to overhaul this behavior.
taskErr := err.(*taskError)
flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED)
msg := "Error running job (will be retried):"
if aborted {
msg = "Error running task, but job kill request received, (will not retry):"
err = nil
} else {
if preventRetries {
msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask)
err = nil
} else {
jobState.errorRunningTask(taskID, err, preempted)
}
}
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"err": taskErr,
"cmd": strings.Join(taskDef.Argv, " "),
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info(msg)
// If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds.
if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil {
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
}).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ")
// TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask
go func() {
for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) {
time.Sleep(time.Second)
}
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ")
}()
}
}
if err == nil || aborted {
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"command": strings.Join(taskDef.Argv, " "),
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Ending task.")
jobState.taskCompleted(taskID, true)
}
// update cluster state that this node is now free and if we consider the runner to be flaky.
log.WithFields(
log.Fields{
"jobId": jobID,
"taskId": taskID,
"node": nodeSt.node,
"flaky": flaky,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Freeing node, removed job.")
s.clusterState.taskCompleted(nodeId, flaky)
total := 0
completed := 0
running := 0
for _, job := range s.inProgressJobs {
total += len(job.Tasks)
completed += job.TasksCompleted
running += job.TasksRunning
}
log.WithFields(
log.Fields{
"jobId": jobID,
"running": jobState.TasksRunning,
"completed": jobState.TasksCompleted,
"total": len(jobState.Tasks),
"isdone": jobState.TasksCompleted == len(jobState.Tasks),
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info()
log.WithFields(
log.Fields{
"running": running,
"completed": completed,
"total": total,
"alldone": completed == total,
"requestor": requestor,
"jobType": jobType,
"tag": tag,
}).Info("Jobs task summary")
})
}
}