runner/execer/os/process.go (222 lines of code) (raw):

package os import ( "context" "errors" "fmt" "os" "os/exec" "sync" "syscall" "time" scooterror "github.com/twitter/scoot/common/errors" "github.com/twitter/scoot/common/log/tags" scootexecer "github.com/twitter/scoot/runner/execer" log "github.com/sirupsen/logrus" ) // Implements runner/scootexecer.Process type process struct { cmd *exec.Cmd wg *sync.WaitGroup waiting bool result *scootexecer.ProcessStatus mutex sync.Mutex ats int // Abort Timeout before sigkill, in Seconds tags.LogTags } // Wait for the process to finish. // If the command finishes without error return the status COMPLETE and exit Code 0. // If the command fails, and we can get the exit code from the command, return COMPLETE with the failing exit code. // if the command fails and we cannot get the exit code from the command, return FAILED and the error // that prevented getting the exit code. func (p *process) Wait() (result scootexecer.ProcessStatus) { p.mutex.Lock() p.waiting = true p.mutex.Unlock() // Wait for the output goroutines to finish then wait on the process itself to release resources. p.wg.Wait() pid := p.cmd.Process.Pid err := p.cmd.Wait() log.WithFields( log.Fields{ "pid": pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Infof("Finished waiting for process") p.mutex.Lock() defer p.mutex.Unlock() p.waiting = false // Trace output with timeout since it seems CombinedOutput() sometimes fails to return. if log.IsLevelEnabled(log.TraceLevel) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ps, errDbg := exec.CommandContext(ctx, "ps", "-u", os.Getenv("USER"), "-opid,sess,ppid,pgid,rss,args").CombinedOutput() log.WithFields( log.Fields{ "pid": pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, "ps": string(ps), "err": errDbg, "errCtx": ctx.Err(), }).Tracef("Current ps for pid %d", pid) cancel() } if p.result != nil { return *p.result } else { p.result = &result } if err == nil { // the command finished without an error result.State = scootexecer.COMPLETE result.ExitCode = 0 // stdout and stderr are collected and set by (invoke.go) runner return result } if err, ok := err.(*exec.ExitError); ok { // the command returned an error, if we can get a WaitStatus from the error, // we can get the commands exit code if status, ok := err.Sys().(syscall.WaitStatus); ok { result.State = scootexecer.COMPLETE result.ExitCode = scooterror.ExitCode(status.ExitStatus()) // stdout and stderr are collected and set by (invoke.go) runner return result } result.State = scootexecer.FAILED result.Error = "Could not find WaitStatus from exiterr.Sys()" return result } result.State = scootexecer.FAILED result.Error = err.Error() return result } // Attempt to SIGTERM process, allowing for graceful exit // SIGKILL after 10 seconds or if process.cmd.Wait() returns an error func (p *process) Abort() scootexecer.ProcessStatus { p.mutex.Lock() defer p.mutex.Unlock() if p.result != nil { return *p.result } else { p.result = &scootexecer.ProcessStatus{} } p.result.State = scootexecer.FAILED p.result.ExitCode = -1 p.result.Error = "Aborted" if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil { msg := fmt.Sprintf("Error aborting command via SIGTERM: %s.", err) log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Errorf(msg) p.KillAndWait(msg) } else { log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Info("Aborting process via SIGTERM") } // Add buffer in case of race condition where both <-cmdDoneCh returns an error & timeout is exceeded at same time errCh := make(chan error, 1) go func() { select { case <-time.After(time.Second * time.Duration(p.ats)): errCh <- errors.New(fmt.Sprintf("%d second timeout exceeded.", p.ats)) } }() cmdDoneCh := make(chan error) // Wait in the process if nothing already has claimed it; // if cmd.Wait() was already called, calling it again is an immediate error. // If already called, just poll periodically if the process has exited if !p.waiting { go func() { // p.wg ignored cmdDoneCh <- p.cmd.Wait() }() } else { go func() { timeout := time.Now().Add(time.Second * time.Duration(p.ats)) for time.Now().Before(timeout) { // note that we can't rely on ProcessState.Exited() - not true when p is signaled if p.cmd.ProcessState != nil { cmdDoneCh <- nil return } time.Sleep(10 * time.Millisecond) } }() } for { select { case err := <-cmdDoneCh: sigtermed := false if err == nil { sigtermed = true } if err != nil { if err, ok := err.(*exec.ExitError); ok { if status, ok := err.Sys().(syscall.WaitStatus); ok { if status.Signaled() { sigtermed = true } } } } if sigtermed { log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Info("Command finished via SIGTERM") p.result.Error += " (SIGTERM)" return *p.result } else { // We weren't able to infer the task exited either normally or due to sigterm msg := fmt.Sprintf("Command failed to terminate successfully: %v", err) log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Error(msg) errCh <- errors.New(msg) // Loop back and pull from errCh to force cleanup } case msg := <-errCh: log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Error(msg) p.KillAndWait(fmt.Sprintf("%s. Killing command.", msg)) p.result.Error += " (SIGKILL)" return *p.result default: } } } // Kill process for exceeding MemCap func (p *process) MemCapKill() { p.mutex.Lock() defer p.mutex.Unlock() if p.result == nil { p.result = &scootexecer.ProcessStatus{} } p.result.State = scootexecer.FAILED p.result.ExitCode = -1 p.KillAndWait("Killed for memory usage over MemCap") } // Kills process via SIGKILL and all processes of its pgid func (p *process) KillAndWait(resultError string) { pgid, err := syscall.Getpgid(p.cmd.Process.Pid) if err != nil { log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "error": err, "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Error("Error finding pgid") } else { defer cleanupProcs(pgid) } p.result.Error += fmt.Sprintf(" %s", resultError) err = p.cmd.Process.Kill() if err != nil { p.result.Error += fmt.Sprintf(" Couldn't kill process: %s. Will still attempt cleanup.", err) } _, err = p.cmd.Process.Wait() if err, ok := err.(*exec.ExitError); ok { if status, ok := err.Sys().(syscall.WaitStatus); ok { p.result.ExitCode = scooterror.ExitCode(status.ExitStatus()) } } }