runner/execer/os/process_watcher.go (122 lines of code) (raw):

package os import ( "context" "fmt" "io" "os" "time" "os/exec" "strings" log "github.com/sirupsen/logrus" scootexecer "github.com/twitter/scoot/runner/execer" ) // Used for mocking memCap monitoring type ProcessWatcher interface { GetProcs() (map[int]ProcInfo, error) MemUsage(int) (scootexecer.Memory, error) LogProcs(*process, log.Level, io.Writer) } type ProcInfo struct { pid int pgid int ppid int rss int } func (p ProcInfo) Pid() int { return p.pid } type procWatcher struct { allProcesses map[int]ProcInfo processGroups map[int][]ProcInfo parentProcesses map[int][]ProcInfo } func NewProcWatcher() *procWatcher { return &procWatcher{} } // Get a full list of processes running, including their pid, pgid, ppid, and memory usage, and set procWatcher's fields func (opw *procWatcher) GetProcs() (map[int]ProcInfo, error) { cmd := "ps -e -o pid= -o pgid= -o ppid= -o rss= | tr '\n' ';' | sed 's,;$,,'" psList := exec.Command("bash", "-c", cmd) b, err := psList.Output() if err != nil { return nil, err } procs := strings.Split(string(b), ";") ap, pg, pp, err := parseProcs(procs) if err != nil { return nil, err } opw.allProcesses = ap opw.processGroups = pg opw.parentProcesses = pp return opw.allProcesses, nil } // Sums memory usage for a given process, including usage by related processes func (opw *procWatcher) MemUsage(pid int) (scootexecer.Memory, error) { if _, ok := opw.allProcesses[pid]; !ok { return 0, fmt.Errorf("%d was not present in list of all processes", pid) } procGroupID := opw.allProcesses[pid].pgid // We have relatedProcesses & relatedProcessesMap b/c iterating over the range of a map while modifying it in place // introduces non-deterministic flaky behavior wrt memUsage summation. We add related procs to the relatedProcesses // slice iff they aren't present in relatedProcessesMap relatedProcesses := []ProcInfo{} relatedProcessesMap := make(map[int]ProcInfo) total := 0 // Seed relatedProcesses with all procs from pid's process group for idx := 0; idx < len(opw.processGroups[procGroupID]); idx++ { p := opw.processGroups[procGroupID][idx] relatedProcesses = append(relatedProcesses, opw.allProcesses[p.pid]) relatedProcessesMap[p.pid] = p } // Add all child procs of processes in pid's process group (and their child procs as well) for i := 0; i < len(relatedProcesses); i++ { rp := relatedProcesses[i] procPid := rp.pid for j := 0; j < len(opw.parentProcesses[procPid]); j++ { p := opw.parentProcesses[procPid][j] // Make sure it isn't already present in map if _, ok := relatedProcessesMap[p.pid]; !ok { relatedProcesses = append(relatedProcesses, opw.allProcesses[p.pid]) relatedProcessesMap[p.pid] = p } } } // Add total rss usage of all relatedProcesses for _, proc := range relatedProcessesMap { total += proc.rss } return scootexecer.Memory(total * bytesToKB), nil } // LogProcs logs the process snapshot of the current process along with other running processes for the user in the worker log, // at the specified level. Also writes to the writer, if provided func (opw *procWatcher) LogProcs(p *process, level log.Level, w io.Writer) { if !log.IsLevelEnabled(level) { return } // log output with timeout since it seems CombinedOutput() sometimes fails to return. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ps, err := exec.CommandContext(ctx, "ps", "-u", os.Getenv("USER"), "-opid,sess,ppid,pgid,rss,args", "--sort=-rss").CombinedOutput() log.WithFields( log.Fields{ "pid": p.cmd.Process.Pid, "ps": string(ps), "err": err, "errCtx": ctx.Err(), "tag": p.Tag, "jobID": p.JobID, "taskID": p.TaskID, }).Log(level, fmt.Sprintf("ps after increased memory utilization for pid %d", p.cmd.Process.Pid)) if w != nil { w.Write([]byte(fmt.Sprintf("\nps after increased memory utilization for pid %d:\n\n", p.cmd.Process.Pid))) w.Write(ps) } cancel() } // Format processes into pgid and ppid groups for summation of memory usage func parseProcs(procs []string) (allProcesses map[int]ProcInfo, processGroups map[int][]ProcInfo, parentProcesses map[int][]ProcInfo, err error) { allProcesses = make(map[int]ProcInfo) processGroups = make(map[int][]ProcInfo) parentProcesses = make(map[int][]ProcInfo) for idx := 0; idx < len(procs); idx++ { var p ProcInfo n, err := fmt.Sscanf(procs[idx], "%d %d %d %d", &p.pid, &p.pgid, &p.ppid, &p.rss) if err != nil { return nil, nil, nil, err } if n != 4 { return nil, nil, nil, fmt.Errorf("Error parsing output, expected 4 assigments, but only received %d. %v", n, procs) } allProcesses[p.pid] = p processGroups[p.pgid] = append(processGroups[p.pgid], p) parentProcesses[p.ppid] = append(parentProcesses[p.ppid], p) } return allProcesses, processGroups, parentProcesses, nil }