runner/execer/os/execer.go (234 lines of code) (raw):
package os
import (
"fmt"
"io"
"math"
"os"
"os/exec"
"sync"
"syscall"
"time"
"github.com/twitter/scoot/common/errors"
"github.com/twitter/scoot/common/stats"
scootexecer "github.com/twitter/scoot/runner/execer"
log "github.com/sirupsen/logrus"
)
type WriterDelegater interface {
// Return an underlying Writer. Why? Because some methods type assert to
// a more specific type and are more clever (e.g., if it's an *os.File, hook it up
// directly to a new process's stdout/stderr.)
// We care about this cleverness, so Output both is-a and has-a Writer
// Cf. runner/runners/local_output.go
WriterDelegate() io.Writer
}
// Implements runner/execer.Execer
type execer struct {
// Best effort monitoring of command to kill it if resident memory usage exceeds this cap
memCap scootexecer.Memory
getMemUtilization func(int) (scootexecer.Memory, error)
stat stats.StatsReceiver
pw ProcessWatcher
}
// NewBoundedExecer returns an execer with a ProcGetter and, if non-zero values are provided, a memCap, overriding memory utilization function, and a StatsReceiver
func NewBoundedExecer(memCap scootexecer.Memory, getMemUtilization func() (int64, error), stat stats.StatsReceiver) *execer {
oe := &execer{pw: NewProcWatcher()}
if memCap != 0 {
oe.memCap = memCap
}
if stat != nil {
oe.stat = stat
}
// if not nil, use the provided function to get memory utilization,
// otherwise get the memory usage of the current process and its subprocesses
if getMemUtilization != nil {
oe.getMemUtilization = func(int) (scootexecer.Memory, error) {
mem, err := getMemUtilization()
if err != nil {
return 0, err
}
return scootexecer.Memory(mem), err
}
} else {
oe.getMemUtilization = oe.pw.MemUsage
}
return oe
}
// Start a command, monitor its memory, and return an &process wrapper for it
func (e *execer) Exec(command scootexecer.Command) (scootexecer.Process, error) {
if len(command.Argv) == 0 {
return nil, fmt.Errorf("No command specified.")
}
cmd := exec.Command(command.Argv[0], command.Argv[1:]...)
cmd.Dir = command.Dir
// Use the parent environment plus whatever additional env vars are provided.
cmd.Env = os.Environ()
for k, v := range command.EnvVars {
cmd.Env = append(cmd.Env, k+"="+v)
}
// Sets pgid of all child processes to cmd's pid
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Make sure to get the best possible Writer, so if possible os/exec can connect
// the command's stdout/stderr directly to a file, instead of having to go through
// our delegation
if stdoutW, ok := command.Stdout.(WriterDelegater); ok {
command.Stdout = stdoutW.WriterDelegate()
}
if stderrW, ok := cmd.Stderr.(WriterDelegater); ok {
command.Stderr = stderrW.WriterDelegate()
}
// Use pipes due to possible hang in process.Wait().
// See: https://github.com/noxiouz/stout/commit/42cc533a0bece540f2424faff2a960876b21ffd2
stdErrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdOutPipe, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
io.Copy(command.Stderr, stdErrPipe)
}()
go func() {
defer wg.Done()
io.Copy(command.Stdout, stdOutPipe)
}()
// Async start of the command.
err = cmd.Start()
if err != nil {
return nil, err
}
proc := &process{cmd: cmd, wg: &wg, ats: AbortTimeoutSec, LogTags: command.LogTags}
if e.memCap > 0 {
go e.monitorMem(proc, command.MemCh, command.Stderr)
}
return proc, nil
}
// Periodically check to make sure memory constraints are respected,
// and clean up after ourselves when the process has completed
func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus, stderr io.Writer) {
pid := p.cmd.Process.Pid
pgid, err := syscall.Getpgid(pid)
if err != nil {
log.WithFields(
log.Fields{
"pid": pid,
"error": err,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Error("Error finding pgid")
} else {
defer cleanupProcs(pgid)
}
log.WithFields(
log.Fields{
"pid": pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Info("Monitoring memory")
if _, err := e.pw.GetProcs(); err != nil {
log.Error(err)
}
// check whether memory consumption is above threshold immediately on process start
// mostly indicates that memory utilization was already above cap when the process started
mem, err := e.getMemUtilization(pid)
if err != nil {
log.Debugf("Error getting memory utilization: %s", err)
e.stat.Gauge(stats.WorkerMemory).Update(-1)
} else if mem >= e.memCap {
msg := fmt.Sprintf("Critical error detected. Initial memory utilization of worker is higher than threshold, aborting process %d: %d > %d (%v)",
pid, mem, e.memCap, p.cmd.Args)
e.stat.Counter(stats.WorkerHighInitialMemoryUtilization).Inc(1)
p.mutex.Lock()
p.result = &scootexecer.ProcessStatus{
State: scootexecer.FAILED,
Error: msg,
ExitCode: errors.HighInitialMemoryUtilizationExitCode,
}
// log the process snapshot in worker log, as well as task stderr log
e.pw.LogProcs(p, log.ErrorLevel, stderr)
p.mutex.Unlock()
e.memCapKill(p, mem, memCh)
return
}
thresholdsIdx := 0
reportThresholds := []float64{0, .25, .5, .75, .85, .9, .93, .95, .96, .97, .98, .99, 1}
memTicker := time.NewTicker(500 * time.Millisecond)
defer memTicker.Stop()
for {
select {
case <-memTicker.C:
p.mutex.Lock()
// Process is complete
if p.result != nil {
p.mutex.Unlock()
log.WithFields(
log.Fields{
"pid": pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Info("Finished monitoring memory")
return
}
if _, err := e.pw.GetProcs(); err != nil {
log.Error(err)
}
mem, err := e.getMemUtilization(pid)
if err != nil {
p.mutex.Unlock()
log.Debugf("Error getting memory utilization: %s", err)
e.stat.Gauge(stats.WorkerMemory).Update(-1)
continue
}
e.stat.Gauge(stats.WorkerMemory).Update(int64(mem))
// Abort process if calculated memory utilization is above memCap
if mem >= e.memCap {
msg := fmt.Sprintf("Cmd exceeded MemoryCap, aborting process %d: %d > %d (%v)", pid, mem, e.memCap, p.cmd.Args)
p.result = &scootexecer.ProcessStatus{
State: scootexecer.COMPLETE,
Error: msg,
ExitCode: 1,
}
e.pw.LogProcs(p, log.ErrorLevel, stderr)
p.mutex.Unlock()
e.memCapKill(p, mem, memCh)
return
}
// Report on larger changes when utilization is low, and smaller changes as utilization reaches 100%.
memUsagePct := math.Min(1.0, float64(mem)/float64(e.memCap))
if memUsagePct > reportThresholds[thresholdsIdx] {
log.WithFields(
log.Fields{
"memUsagePct": int(memUsagePct * 100),
"mem": mem,
"memCap": e.memCap,
"args": p.cmd.Args,
"pid": pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Infof("Memory utilization increased to %d%%, pid: %d", int(memUsagePct*100), pid)
e.pw.LogProcs(p, log.DebugLevel, nil)
for memUsagePct > reportThresholds[thresholdsIdx] {
thresholdsIdx++
}
}
p.mutex.Unlock()
}
}
}
// memCapKill kills the process, handles cleanup and returns the process status to the invoker
func (e *execer) memCapKill(p *process, mem scootexecer.Memory, memCh chan scootexecer.ProcessStatus) {
log.WithFields(
log.Fields{
"mem": mem,
"memCap": e.memCap,
"args": p.cmd.Args,
"pid": p.cmd.Process.Pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Info(p.result.Error)
if memCh != nil {
memCh <- *p.result
}
p.MemCapKill()
// record memory after killing process
postKillMem, err := e.getMemUtilization(p.cmd.Process.Pid)
if err != nil {
log.Debugf("Error getting memory utilization after killing process: %s", err)
e.stat.Gauge(stats.WorkerMemory).Update(-1)
}
e.stat.Gauge(stats.WorkerMemory).Update(int64(postKillMem))
}
// Kill process along with all child processes, assuming no child processes called setpgid
func cleanupProcs(pgid int) (err error) {
log.WithFields(
log.Fields{
"pgid": pgid,
}).Info("Cleaning up pgid")
if err = syscall.Kill(-pgid, syscall.SIGKILL); err != nil {
log.WithFields(
log.Fields{
"pgid": pgid,
"error": err,
}).Error("Error cleaning up pgid")
}
return err
}