runner/runners/invoke.go (401 lines of code) (raw):

package runners import ( e "errors" "fmt" "io" "io/ioutil" "time" uuid "github.com/nu7hatch/gouuid" log "github.com/sirupsen/logrus" "github.com/twitter/scoot/common/errors" "github.com/twitter/scoot/common/log/tags" "github.com/twitter/scoot/common/stats" "github.com/twitter/scoot/runner" "github.com/twitter/scoot/runner/execer" "github.com/twitter/scoot/runner/execer/execers" "github.com/twitter/scoot/snapshot" ) // invoke.go: Invoker runs a Scoot command. // NewInvoker creates an Invoker that will use the supplied helpers func NewInvoker( exec execer.Execer, filerMap runner.RunTypeMap, output runner.OutputCreator, stat stats.StatsReceiver, dirMonitor *stats.DirsMonitor, rID runner.RunnerID, preprocessors []func() error, postprocessors []func() error, uploader LogUploader, ) *Invoker { if stat == nil { stat = stats.NilStatsReceiver() } return &Invoker{exec: exec, filerMap: filerMap, output: output, stat: stat, dirMonitor: dirMonitor, rID: rID, preprocessors: preprocessors, postprocessors: postprocessors, uploader: uploader} } // Invoker Runs a Scoot Command by performing the Scoot setup and gathering. // (E.g., checking out a Snapshot, or saving the Output once it's done) // Unlike a full Runner, it has no idea of what else is running or has run. type Invoker struct { exec execer.Execer filerMap runner.RunTypeMap output runner.OutputCreator stat stats.StatsReceiver dirMonitor *stats.DirsMonitor rID runner.RunnerID preprocessors []func() error postprocessors []func() error uploader LogUploader } // Run runs cmd // Run will send updates as the process is running to updateCh. // The RunStatus'es that come out of updateCh will have an empty RunID // Run will enforce cmd's Timeout, and will abort cmd if abortCh is signaled. // updateCh will not close until the run is finished running. func (inv *Invoker) Run(cmd *runner.Command, id runner.RunID) (abortCh chan<- struct{}, updateCh <-chan runner.RunStatus) { abortChFull := make(chan struct{}) memChFull := make(chan execer.ProcessStatus) updateChFull := make(chan runner.RunStatus) go inv.run(cmd, id, abortChFull, memChFull, updateChFull) return abortChFull, updateChFull } // Run runs cmd as run id returning the final ProcessStatus // Run will send updates the process is running to updateCh. // Run will enforce cmd's Timeout, and will abort cmd if abortCh is signaled. // Run will not return until the process is not running. func (inv *Invoker) run(cmd *runner.Command, id runner.RunID, abortCh chan struct{}, memCh chan execer.ProcessStatus, updateCh chan runner.RunStatus) (r runner.RunStatus) { log.WithFields( log.Fields{ "runID": id, "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, }).Info("*Invoker.run()") inv.stat.Gauge(stats.WorkerRunningTask).Update(1) defer inv.stat.Gauge(stats.WorkerRunningTask).Update(0) taskTimer := inv.stat.Latency(stats.WorkerTaskLatency_ms).Time() defer func() { taskTimer.Stop() updateCh <- r close(updateCh) }() start := time.Now() // Records various stages of the run // TODO opporunity for consolidation with existing timers and metrics as part of larger refactor rts := &runTimes{} rts.invokeStart = stamp() var co snapshot.Checkout checkoutCh := make(chan error) // set up pre/postprocessors for _, pp := range inv.preprocessors { log.Info("running preprocessor") if err := pp(); err != nil { log.Errorf("Error running preprocessor %s", err) } } defer func() { for _, pp := range inv.postprocessors { log.Infof("running postprocessor") if err := pp(); err != nil { log.Errorf("Error running postprocessor %s", err) } } }() // Determine RunType from Command SnapshotID // This invoker supports RunTypeScoot var runType runner.RunType = runner.RunTypeScoot if _, ok := inv.filerMap[runType]; !ok { return runner.FailedStatus(id, errors.NewError(fmt.Errorf("Invoker does not have filer for command of RunType: %s", runType), errors.PreProcessingFailureExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) } rts.inputStart = stamp() // if we are checking out a snapshot, start the timer outside of go routine var downloadTimer stats.Latency if cmd.SnapshotID != "" { downloadTimer = inv.stat.Latency(stats.WorkerDownloadLatency_ms).Time() inv.stat.Counter(stats.WorkerDownloads).Inc(1) } // update local workspace with snapshot go func() { if cmd.SnapshotID == "" { // TODO: we don't want this logic to live here, these decisions should be made at a higher level. if len(cmd.Argv) > 0 && cmd.Argv[0] != execers.UseSimExecerArg { log.WithFields( log.Fields{ "runID": id, "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, }).Info("No snapshotID! Using a nop-checkout initialized with tmpDir") } if tmp, err := ioutil.TempDir("", "invoke_nop_checkout"); err != nil { checkoutCh <- err } else { co = snapshot.NewNopCheckout(string(id), tmp) checkoutCh <- nil } } else { log.WithFields( log.Fields{ "runID": id, "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, "snapshotID": cmd.SnapshotID, }).Info("Checking out snapshotID") var err error co, err = inv.filerMap[runType].Filer.Checkout(cmd.SnapshotID) checkoutCh <- err } }() // wait for checkout to finish (or abort signal) select { case <-abortCh: if err := inv.filerMap[runType].Filer.CancelCheckout(); err != nil { log.Errorf("Error canceling checkout: %s", err) } if err := <-checkoutCh; err != nil { log.Errorf("Checkout errored: %s", err) // If there was an error there should be no lingering gitdb locks, so return // In addition, co should be nil, so failing to return and calling co.Release() // will result in a nil pointer dereference } else { // If there was no error then we need to release this checkout. co.Release() } return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) case err := <-checkoutCh: // stop the timer // note: aborted runs don't stop the timer - the reported download time should remain 0 // successful and erroring downloads will report time values if cmd.SnapshotID != "" { downloadTimer.Stop() } if err != nil { var failedStatus runner.RunStatus codeErr, ok := err.(*errors.ExitCodeError) switch ok { case true: // err is of type github.com/twitter/scoot/common/errors.Error failedStatus = runner.FailedStatus(id, codeErr, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) default: // err is not of type github.com/twitter/scoot/common/errors.Error failedStatus = runner.FailedStatus(id, errors.NewError(err, errors.GenericCheckoutFailureExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) } return failedStatus } // Checkout is ok, continue with run and when finished release checkout. defer co.Release() rts.inputEnd = stamp() } log.WithFields( log.Fields{ "runID": id, "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, "checkout": co.Path(), }).Info("Checkout done") // setup stdout,stderr output stdout, err := inv.output.Create(fmt.Sprintf("%s-stdout", id)) if err != nil { msg := fmt.Sprintf("could not create stdout: %s", err) failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.LogRefCreationFailureExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) return failedStatus } defer stdout.Close() stderr, err := inv.output.Create(fmt.Sprintf("%s-stderr", id)) if err != nil { msg := fmt.Sprintf("could not create stderr: %s", err) failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.LogRefCreationFailureExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) return failedStatus } defer stderr.Close() stdlog, err := inv.output.Create(fmt.Sprintf("%s-stdlog", id)) if err != nil { msg := fmt.Sprintf("could not create combined stdout/stderr: %s", err) failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.LogRefCreationFailureExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) return failedStatus } defer stdlog.Close() marker := "###########################################\n###########################################\n" format := "%s\n\nDate: %v\nOut: %s\tErr: %s\tOutErr: %s\tCmd:\n%v\n\n%s\n\n\nSCOOT_CMD_LOG\n" header := fmt.Sprintf(format, marker, time.Now(), stdout.URI(), stderr.URI(), stdlog.URI(), cmd, marker) // If we wanted to allow optionally, a switch for this would come either at the Worker level // (via Invoker -> QueueRunner construction), or the Command level (job requestor specifies in e.g. a PlatformProperty) // Processing/setup post checkout before execution switch runType { case runner.RunTypeScoot: stdout.Write([]byte(header)) stderr.Write([]byte(header)) stdlog.Write([]byte(header)) } inv.dirMonitor.GetStartSizes() // start monitoring directory sizes // start running the command log.WithFields( log.Fields{ "runID": id, "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, "stdout": stdout.AsFile(), "stderr": stderr.AsFile(), "stdlog": stdlog.AsFile(), }).Debug("Stdout/Stderr output") rts.execStart = stamp() // candidate for availability via Execer p, err := inv.exec.Exec(execer.Command{ Argv: cmd.Argv, EnvVars: cmd.EnvVars, Dir: co.Path(), Stdout: io.MultiWriter(stdout, stdlog), Stderr: io.MultiWriter(stderr, stdlog), MemCh: memCh, LogTags: cmd.LogTags, }) if err != nil { msg := fmt.Sprintf("could not exec: %s", err) failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.CouldNotExecExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) return failedStatus } var timeoutCh <-chan time.Time if cmd.Timeout > 0 { // Timeout if applicable elapsed := time.Now().Sub(start) timeout := time.NewTimer(cmd.Timeout - elapsed) timeoutCh = timeout.C defer timeout.Stop() } updateCh <- runner.RunningStatus(id, stdout.URI(), stderr.URI(), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) processCh := make(chan execer.ProcessStatus, 1) go func() { processCh <- p.Wait() }() var runStatus runner.RunStatus // Wait for process to complete (or cancel if we're told to) select { case <-abortCh: stdout.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask aborted: %v", marker, cmd.String()))) stderr.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask aborted: %v", marker, cmd.String()))) stdlog.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask aborted: %v", marker, cmd.String()))) p.Abort() return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) case <-timeoutCh: stdout.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask exceeded timeout %v: %v", marker, cmd.Timeout, cmd.String()))) stderr.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask exceeded timeout %v: %v", marker, cmd.Timeout, cmd.String()))) stdlog.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask exceeded timeout %v: %v", marker, cmd.Timeout, cmd.String()))) inv.stat.Counter(stats.WorkerTimeouts).Inc(1) p.Abort() log.WithFields( log.Fields{ "cmd": cmd.String(), "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, }).Error("Run timedout") runStatus = runner.TimeoutStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) case st := <-memCh: stdout.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\n%v", marker, st.Error))) stderr.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\n%v", marker, st.Error))) stdlog.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\n%v", marker, st.Error))) log.WithFields( log.Fields{ "runID": id, "cmd": cmd.String(), "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, "status": st, "checkout": co.Path(), }).Errorf(st.Error) inv.stat.Counter(stats.WorkerMemoryCapExceeded).Inc(1) runStatus = getPostExecRunStatus(st, id, cmd) runStatus.Error = st.Error case st := <-processCh: // Process has completed log.WithFields( log.Fields{ "runID": id, "cmd": cmd.String(), "tag": cmd.Tag, "jobID": cmd.JobID, "taskID": cmd.TaskID, "status": st, "checkout": co.Path(), }).Info("Run done") runStatus = getPostExecRunStatus(st, id, cmd) if runStatus.State == runner.FAILED { return runStatus } } // record command's disk usage for the monitored directories inv.dirMonitor.GetEndSizes() inv.dirMonitor.RecordSizeStats(inv.stat) // the command is no longer running, post process the results rts.execEnd = stamp() rts.outputStart = stamp() var stderrUrl, stdoutUrl string // only upload logs to a permanent location if a log uploader is initialized if inv.uploader != nil { defer inv.stat.Latency(stats.WorkerUploadLatency_ms).Time().Stop() // generate a unique id that's appended to the job id to create a unique identifier for logs logUid, _ := uuid.NewV4() stdlogName := "stdlog" stderrName := "stderr" stdoutName := "stdout" // upload stdlog logId := fmt.Sprintf("%s_%s/%s", cmd.JobID, logUid, stdlogName) _, isAborted := inv.uploadLog(logId, stdlog.AsFile(), abortCh) if isAborted { return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) } // upload stderr // TODO: remove when we transition to using only stdlog in run status logId = fmt.Sprintf("%s_%s/%s", cmd.JobID, logUid, stderrName) stderrUrl, isAborted = inv.uploadLog(logId, stderr.AsFile(), abortCh) if isAborted { return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) } // upload stdout // TODO: remove when we transition to using only stdlog in run status logId = fmt.Sprintf("%s_%s/%s", cmd.JobID, logUid, stdoutName) stdoutUrl, isAborted = inv.uploadLog(logId, stdout.AsFile(), abortCh) if isAborted { return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) } // Note: stdout/stderr refs are only modified when logs are successfully uploaded to storage runStatus.StderrRef = stderrUrl runStatus.StdoutRef = stdoutUrl } else { log.Infof("log uploader not initialized, skipping logs upload to storage") // Return local std stream refs if log uploader is not initialized runStatus.StderrRef = stderr.URI() runStatus.StdoutRef = stdout.URI() } rts.outputEnd = stamp() rts.invokeEnd = stamp() return runStatus } func (inv *Invoker) uploadLog(logId, filepath string, abortCh chan struct{}) (string, bool) { var url string var err error uploadCancelCh := make(chan struct{}) uploadCh := make(chan error) defer inv.stat.Latency(stats.WorkerLogUploadLatency_ms).Time().Stop() go func() { url, err = inv.uploader.UploadLog(logId, filepath, uploadCancelCh) uploadCh <- err }() select { case abort := <-abortCh: uploadCancelCh <- abort return "", true case err := <-uploadCh: if err != nil { inv.stat.Counter(stats.WorkerLogUploadFailures).Inc(1) log.Error(err) return "", false } inv.stat.Counter(stats.WorkerUploads).Inc(1) return url, false } } func getPostExecRunStatus(st execer.ProcessStatus, id runner.RunID, cmd *runner.Command) (runStatus runner.RunStatus) { switch st.State { case execer.COMPLETE: runStatus = runner.CompleteStatus(id, "", st.ExitCode, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) if st.Error != "" { runStatus.Error = st.Error } case execer.FAILED: // use the exit code from process status if present, otherwise use default exit code ec := errors.ExitCode(errors.PostExecFailureExitCode) if int(st.ExitCode) != 0 { ec = st.ExitCode } msg := fmt.Sprintf("error execing: %s", st.Error) runStatus = runner.FailedStatus(id, errors.NewError(e.New(msg), ec), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) default: msg := "unexpected exec state" runStatus = runner.FailedStatus(id, errors.NewError(e.New(msg), errors.PostExecFailureExitCode), tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag}) } return runStatus } // Tracking timestamps for stages of an invoker run. // Values are only set with non-zero Time when stage has completed successfully. type runTimes struct { invokeStart time.Time invokeEnd time.Time inputStart time.Time actionCacheCheckStart time.Time actionCacheCheckEnd time.Time actionFetchStart time.Time actionFetchEnd time.Time commandFetchStart time.Time commandFetchEnd time.Time inputEnd time.Time execStart time.Time execEnd time.Time outputStart time.Time outputEnd time.Time queuedTime time.Time // set by scheduler and must be populated e.g. by task metadata } // Wrapper around time values to encourage "stamp()" usage so it's harder to lose track of runTimes fields. // Longer term, we should refactor the Invoker so the checkout/exec/upload phases are // separated from the implementation logic, which will allow these to be recorded clearly func stamp() time.Time { return time.Now() }