in runner/execer/os/execer.go [128:243]
func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus, stderr io.Writer) {
pid := p.cmd.Process.Pid
pgid, err := syscall.Getpgid(pid)
if err != nil {
log.WithFields(
log.Fields{
"pid": pid,
"error": err,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Error("Error finding pgid")
} else {
defer cleanupProcs(pgid)
}
log.WithFields(
log.Fields{
"pid": pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Info("Monitoring memory")
if _, err := e.pw.GetProcs(); err != nil {
log.Error(err)
}
// check whether memory consumption is above threshold immediately on process start
// mostly indicates that memory utilization was already above cap when the process started
mem, err := e.getMemUtilization(pid)
if err != nil {
log.Debugf("Error getting memory utilization: %s", err)
e.stat.Gauge(stats.WorkerMemory).Update(-1)
} else if mem >= e.memCap {
msg := fmt.Sprintf("Critical error detected. Initial memory utilization of worker is higher than threshold, aborting process %d: %d > %d (%v)",
pid, mem, e.memCap, p.cmd.Args)
e.stat.Counter(stats.WorkerHighInitialMemoryUtilization).Inc(1)
p.mutex.Lock()
p.result = &scootexecer.ProcessStatus{
State: scootexecer.FAILED,
Error: msg,
ExitCode: errors.HighInitialMemoryUtilizationExitCode,
}
// log the process snapshot in worker log, as well as task stderr log
e.pw.LogProcs(p, log.ErrorLevel, stderr)
p.mutex.Unlock()
e.memCapKill(p, mem, memCh)
return
}
thresholdsIdx := 0
reportThresholds := []float64{0, .25, .5, .75, .85, .9, .93, .95, .96, .97, .98, .99, 1}
memTicker := time.NewTicker(500 * time.Millisecond)
defer memTicker.Stop()
for {
select {
case <-memTicker.C:
p.mutex.Lock()
// Process is complete
if p.result != nil {
p.mutex.Unlock()
log.WithFields(
log.Fields{
"pid": pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Info("Finished monitoring memory")
return
}
if _, err := e.pw.GetProcs(); err != nil {
log.Error(err)
}
mem, err := e.getMemUtilization(pid)
if err != nil {
p.mutex.Unlock()
log.Debugf("Error getting memory utilization: %s", err)
e.stat.Gauge(stats.WorkerMemory).Update(-1)
continue
}
e.stat.Gauge(stats.WorkerMemory).Update(int64(mem))
// Abort process if calculated memory utilization is above memCap
if mem >= e.memCap {
msg := fmt.Sprintf("Cmd exceeded MemoryCap, aborting process %d: %d > %d (%v)", pid, mem, e.memCap, p.cmd.Args)
p.result = &scootexecer.ProcessStatus{
State: scootexecer.COMPLETE,
Error: msg,
ExitCode: 1,
}
e.pw.LogProcs(p, log.ErrorLevel, stderr)
p.mutex.Unlock()
e.memCapKill(p, mem, memCh)
return
}
// Report on larger changes when utilization is low, and smaller changes as utilization reaches 100%.
memUsagePct := math.Min(1.0, float64(mem)/float64(e.memCap))
if memUsagePct > reportThresholds[thresholdsIdx] {
log.WithFields(
log.Fields{
"memUsagePct": int(memUsagePct * 100),
"mem": mem,
"memCap": e.memCap,
"args": p.cmd.Args,
"pid": pid,
"tag": p.Tag,
"jobID": p.JobID,
"taskID": p.TaskID,
}).Infof("Memory utilization increased to %d%%, pid: %d", int(memUsagePct*100), pid)
e.pw.LogProcs(p, log.DebugLevel, nil)
for memUsagePct > reportThresholds[thresholdsIdx] {
thresholdsIdx++
}
}
p.mutex.Unlock()
}
}
}