runner/runners/queue.go (420 lines of code) (raw):

package runners import ( "fmt" "os" "sync" "time" log "github.com/sirupsen/logrus" "github.com/twitter/scoot/common/errors" "github.com/twitter/scoot/common/log/hooks" "github.com/twitter/scoot/common/log/tags" "github.com/twitter/scoot/common/stats" "github.com/twitter/scoot/runner" "github.com/twitter/scoot/runner/execer" "github.com/twitter/scoot/snapshot" ) // Used to get proper logging from tests... func init() { if loglevel := os.Getenv("SCOOT_LOGLEVEL"); loglevel != "" { level, err := log.ParseLevel(loglevel) if err != nil { log.Error(err) return } log.SetLevel(level) log.AddHook(hooks.NewContextHook()) } else { // setting Error level to avoid Travis test failure due to log too long log.SetLevel(log.ErrorLevel) } } const QueueFullMsg = "No resources available. Please try later." const QueueInitingMsg = "Queue is still initializing. Please try later." const QueueInvalidMsg = "Failed initialization, queue permanently broken." const WorkerUnhealthyMsg = "Worker is unhealthy." type result struct { st runner.RunStatus err error } type runReq struct { cmd *runner.Command resultCh chan result } type abortReq struct { runID runner.RunID resultCh chan result } type cmdAndID struct { cmd *runner.Command id runner.RunID } /* NewQueueRunner creates a new Service that uses a Queue If the worker has an initialization step (indicated by non-nil in idc) the queue will wait for the worker to finish initialization before accepting any run requests. If the queue is full when a command is received, an empty RunStatus and a queue full error will be returned. @param: exec - runs the command @param: filerMap - mapping of runner.RunType's to filers and corresponding InitDoneCh's that are used by underlying Invokers. The Controller waits on all non-nil InitDoneCh's to complete successfully before serving requests. @param: output @param: tmp @param: capacity - the maximum number of commands to support on the queue. If 0 then the queue is unbounded. @param: stats - the stats receiver the queue will use when reporting its metrics @param: dirMonitor - monitor directory size changes from running the task's command @param: rID - the runner id */ func NewQueueRunner( exec execer.Execer, filerMap runner.RunTypeMap, output runner.OutputCreator, capacity int, stat stats.StatsReceiver, dirMonitor *stats.DirsMonitor, rID runner.RunnerID, preprocessors []func() error, postprocessors []func() error, uploader LogUploader, ) runner.Service { if stat == nil { stat = stats.NilStatsReceiver() } //FIXME(jschiller): proper history config rather than keying off of capacity and if this is a SingleRunner. history := 1 if capacity > 0 { history = 0 // unlimited if acting as a queue (vs single runner). } else if capacity == 0 { capacity = 1 // singleRunner, override capacity so it can actually run a command. } statusManager := NewStatusManager(history) inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID, preprocessors, postprocessors, uploader) controller := &QueueController{ statusManager: statusManager, inv: inv, filerMap: filerMap, updateReq: make(map[runner.RunType]bool), capacity: capacity, reqCh: make(chan interface{}), updateCh: make(chan interface{}), cancelTimerCh: make(chan interface{}), } run := &Service{controller, statusManager} // QueueRunner waits on filers with InitDoneChannels defined to return, // and will not serve requests if any return an error var wg sync.WaitGroup var initErr error = nil wait := false for t, f := range filerMap { if f.IDC != nil { log.Infof("Starting goroutine to wait on init for filer %v", t) wg.Add(1) wait = true go func(rt runner.RunType, idc snapshot.InitDoneCh) { err := <-idc if err != nil { initErr = err log.Errorf("Init channel for filer %v returned error: %s", rt, err) } wg.Done() }(t, f.IDC) } } if wait { // Wait for initialization to complete in a new goroutine to let this constructor return go func() { wg.Wait() if initErr != nil { stat.Counter(stats.WorkerDownloadInitFailure).Inc(1) statusManager.UpdateService(runner.ServiceStatus{Initialized: false, Error: initErr}) } else { statusManager.UpdateService(runner.ServiceStatus{Initialized: true, IsHealthy: true}) stat.Gauge(stats.WorkerUnhealthy).Update(0) controller.startUpdateTickers() } }() } else { statusManager.UpdateService(runner.ServiceStatus{Initialized: true, IsHealthy: true}) stat.Gauge(stats.WorkerUnhealthy).Update(0) controller.startUpdateTickers() } go controller.loop() return run } // NewSingleRunner create a SingleRunner func NewSingleRunner( exec execer.Execer, filerMap runner.RunTypeMap, output runner.OutputCreator, stat stats.StatsReceiver, dirMonitor *stats.DirsMonitor, rID runner.RunnerID, preprocessors []func() error, postprocessors []func() error, uploader LogUploader, ) runner.Service { return NewQueueRunner(exec, filerMap, output, 0, stat, dirMonitor, rID, preprocessors, postprocessors, uploader) } // QueueController maintains a queue of commands to run (up to capacity). // Manages updates to underlying Filer via Filer's Update interface, // if a non-zero update interval is defined (updates and tasks cannot run concurrently) type QueueController struct { inv *Invoker filerMap runner.RunTypeMap updateReq map[runner.RunType]bool updateLock sync.Mutex statusManager *StatusManager capacity int queue []cmdAndID runningID runner.RunID runningCmd *runner.Command runningAbort chan<- struct{} // used to track if there is a recurring infrastructure issue lastExitCode errors.ExitCode // used to signal a cmd run request reqCh chan interface{} // used to signal a request to update the Filer updateCh chan interface{} // used to cancel the timer goroutine if started. cancelTimerCh chan interface{} } // Start goroutines that trigger periodic filer updates after controller is initialized func (c *QueueController) startUpdateTickers() { if c.updateCh == nil { return } for t, f := range c.filerMap { d := f.Filer.UpdateInterval() if d == snapshot.NoDuration { continue } go func(rt runner.RunType, td time.Duration) { ticker := time.NewTicker(td) Loop: for { select { case <-ticker.C: c.updateLock.Lock() c.updateReq[rt] = true c.updateLock.Unlock() c.updateCh <- nil case <-c.cancelTimerCh: ticker.Stop() break Loop } } }(t, d) } } // Run enqueues the command or rejects it, returning its status or an error. func (c *QueueController) Run(cmd *runner.Command) (runner.RunStatus, error) { resultCh := make(chan result) c.reqCh <- runReq{cmd, resultCh} result := <-resultCh return result.st, result.err } func (c *QueueController) enqueue(cmd *runner.Command) (runner.RunStatus, error) { _, svcStatus, _ := c.statusManager.StatusAll() log.WithFields( log.Fields{ "ready": svcStatus.Initialized && svcStatus.IsHealthy, "err": svcStatus.Error, "availableSlots": c.capacity - len(c.queue), "totalSlots": c.capacity, "currentRun": c.runningID, "jobID": cmd.JobID, "taskID": cmd.TaskID, "tag": cmd.Tag, }).Info("Trying to run") if !svcStatus.Initialized { errStr := QueueInitingMsg if svcStatus.Error != nil { errStr = svcStatus.Error.Error() } return runner.RunStatus{Error: errStr}, fmt.Errorf(QueueInitingMsg) } if !svcStatus.IsHealthy { errStr := WorkerUnhealthyMsg if svcStatus.Error != nil { errStr = svcStatus.Error.Error() } return runner.RunStatus{Error: errStr}, fmt.Errorf(WorkerUnhealthyMsg) } if len(c.queue) >= c.capacity { return runner.RunStatus{}, fmt.Errorf(QueueFullMsg) } st, err := c.statusManager.NewRun() if err != nil { return st, err } c.queue = append(c.queue, cmdAndID{cmd, st.RunID}) return st, nil } // Abort kills the given run, returning its final status. func (c *QueueController) Abort(run runner.RunID) (runner.RunStatus, error) { resultCh := make(chan result) c.reqCh <- abortReq{run, resultCh} result := <-resultCh return result.st, result.err } func (c *QueueController) abort(run runner.RunID) (runner.RunStatus, error) { if run == c.runningID { if c.runningAbort != nil { log.WithFields( log.Fields{ "currentRun": c.runningID, "jobID": c.runningCmd.JobID, "taskID": c.runningCmd.TaskID, "tag": c.runningCmd.Tag, }).Info("Aborting") close(c.runningAbort) c.runningAbort = nil } } else { for i, cmdID := range c.queue { if run == cmdID.id { log.WithFields( log.Fields{ "run": run, "jobID": cmdID.cmd.JobID, "taskID": cmdID.cmd.TaskID, "tag": cmdID.cmd.Tag, }).Info("Aborting queued run") c.queue = append(c.queue[:i], c.queue[i+1:]...) c.statusManager.Update(runner.AbortStatus( run, tags.LogTags{ JobID: cmdID.cmd.JobID, TaskID: cmdID.cmd.TaskID, Tag: cmdID.cmd.Tag, }, )) } } } status, _, err := runner.FinalStatus(c.statusManager, run) return status, err } // Cancels all goroutines created by this instance and exits run loop. func (c *QueueController) Release() { close(c.reqCh) } // Handle requests to run and update, to provide concurrency management between the two. // Although we can still receive run requests, runs and updates are done blocking. func (c *QueueController) loop() { var watchCh chan runner.RunStatus var updateDoneCh chan interface{} updateRequested := false var idleLatency = c.inv.stat.Latency(stats.WorkerIdleLatency_ms) idleLatency.Time() tryUpdate := func() { if watchCh == nil && updateDoneCh == nil { updateRequested = false updateDoneCh = make(chan interface{}) go func() { // Record filers by RunType that have requested updates c.updateLock.Lock() typesToUpdate := []runner.RunType{} for t, u := range c.updateReq { if u { typesToUpdate = append(typesToUpdate, t) c.updateReq[t] = false } } c.updateLock.Unlock() // Run all requested filer updates serially for _, t := range typesToUpdate { log.Infof("Running filer update for type %v", t) if err := c.filerMap[t].Filer.Update(); err != nil { fields := log.Fields{"err": err} if c.runningCmd != nil { fields["runType"] = t fields["tag"] = c.runningCmd.Tag fields["jobID"] = c.runningCmd.JobID fields["taskID"] = c.runningCmd.TaskID } log.WithFields(fields).Error("Error running Filer Update") } } updateDoneCh <- nil }() } else { updateRequested = true } } tryRun := func() { if watchCh == nil && updateDoneCh == nil && len(c.queue) > 0 && (c.statusManager.svcStatus.IsHealthy) { cmdID := c.queue[0] watchCh = c.runAndWatch(cmdID) idleLatency.Stop() } } for c.reqCh != nil { // Prefer to run an update first if we have one scheduled. If not updating, try running. if updateRequested { tryUpdate() } else { select { case <-c.updateCh: tryUpdate() default: tryRun() } } // Wait on update, updateDone, run start, or run abort. select { case <-c.updateCh: tryUpdate() case <-updateDoneCh: // Give run() a chance right after we've finished updating. updateDoneCh = nil tryRun() case req, ok := <-c.reqCh: // Handle run and abort requests. if !ok { c.reqCh = nil close(c.cancelTimerCh) return } switch r := req.(type) { case runReq: st, err := c.enqueue(r.cmd) r.resultCh <- result{st, err} case abortReq: st, err := c.abort(r.runID) r.resultCh <- result{st, err} } case <-watchCh: // Handle finished run by resetting state. watchCh = nil c.runningID = "" c.runningCmd = nil c.runningAbort = nil c.queue = c.queue[1:] idleLatency.Time() } } } // Run cmd and then start a new goroutine to watch the cmd. // Returns a watchCh for goroutine completion. func (c *QueueController) runAndWatch(cmdID cmdAndID) chan runner.RunStatus { log.WithFields( log.Fields{ "jobID": cmdID.cmd.JobID, "taskID": cmdID.cmd.TaskID, "runID": cmdID.id, "newLen": len(c.queue), "tag": cmdID.cmd.Tag, }).Info("Running") watchCh := make(chan runner.RunStatus) abortCh, statusUpdateCh := c.inv.Run(cmdID.cmd, cmdID.id) c.runningAbort = abortCh c.runningID = cmdID.id c.runningCmd = cmdID.cmd go func() { for st := range statusUpdateCh { log.WithFields( log.Fields{ "runID": st.RunID, "state": st.State, "stdout": st.StdoutRef, "stderr": st.StderrRef, "snapshotID": st.SnapshotID, "exitCode": st.ExitCode, "error": st.Error, "jobID": st.JobID, "taskID": st.TaskID, "tag": st.Tag, }).Info("Queue received status update") c.statusManager.Update(st) if st.State.IsDone() { c.checkAndUpdateServiceHealth(st.ExitCode) c.lastExitCode = errors.ExitCode(st.ExitCode) watchCh <- st return } } }() return watchCh } /* checkAndUpdateServiceHealth updates service status to unhealthy if critical or persistent errors occurred */ func (c *QueueController) checkAndUpdateServiceHealth(exitCode errors.ExitCode) { var svcStatusErr error markUnhealthy := false if c.isCriticalError(exitCode) { svcStatusErr = fmt.Errorf("critical error (%d) occurred. Marking worker unhealthy", exitCode) markUnhealthy = true } if c.isPersistentError(exitCode) { svcStatusErr = fmt.Errorf("errors (%d) occurred multiple times in a row. Marking worker unhealthy", exitCode) markUnhealthy = true } if markUnhealthy { svcStatus := c.statusManager.svcStatus svcStatus.IsHealthy = false svcStatus.Error = svcStatusErr c.statusManager.UpdateService(svcStatus) c.inv.stat.Gauge(stats.WorkerUnhealthy).Update(1) } } /* isPersistentError returns true when one of the critical errors has occurred 2 times in a row. */ func (c *QueueController) isPersistentError(exitCode errors.ExitCode) bool { return exitCode == c.lastExitCode && (exitCode == errors.CleanFailureExitCode || exitCode == errors.CheckoutFailureExitCode) } /* isCriticalError returns true when one of the critical errors has occurred. */ func (c *QueueController) isCriticalError(exitCode errors.ExitCode) bool { return exitCode == errors.HighInitialMemoryUtilizationExitCode }