func()

in runner/execer/os/execer.go [128:243]


func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus, stderr io.Writer) {
	pid := p.cmd.Process.Pid
	pgid, err := syscall.Getpgid(pid)
	if err != nil {
		log.WithFields(
			log.Fields{
				"pid":    pid,
				"error":  err,
				"tag":    p.Tag,
				"jobID":  p.JobID,
				"taskID": p.TaskID,
			}).Error("Error finding pgid")
	} else {
		defer cleanupProcs(pgid)
	}
	log.WithFields(
		log.Fields{
			"pid":    pid,
			"tag":    p.Tag,
			"jobID":  p.JobID,
			"taskID": p.TaskID,
		}).Info("Monitoring memory")
	if _, err := e.pw.GetProcs(); err != nil {
		log.Error(err)
	}

	// check whether memory consumption is above threshold immediately on process start
	// mostly indicates that memory utilization was already above cap when the process started
	mem, err := e.getMemUtilization(pid)
	if err != nil {
		log.Debugf("Error getting memory utilization: %s", err)
		e.stat.Gauge(stats.WorkerMemory).Update(-1)
	} else if mem >= e.memCap {
		msg := fmt.Sprintf("Critical error detected. Initial memory utilization of worker is higher than threshold, aborting process %d: %d > %d (%v)",
			pid, mem, e.memCap, p.cmd.Args)
		e.stat.Counter(stats.WorkerHighInitialMemoryUtilization).Inc(1)
		p.mutex.Lock()
		p.result = &scootexecer.ProcessStatus{
			State:    scootexecer.FAILED,
			Error:    msg,
			ExitCode: errors.HighInitialMemoryUtilizationExitCode,
		}
		// log the process snapshot in worker log, as well as task stderr log
		e.pw.LogProcs(p, log.ErrorLevel, stderr)
		p.mutex.Unlock()
		e.memCapKill(p, mem, memCh)
		return
	}

	thresholdsIdx := 0
	reportThresholds := []float64{0, .25, .5, .75, .85, .9, .93, .95, .96, .97, .98, .99, 1}
	memTicker := time.NewTicker(500 * time.Millisecond)
	defer memTicker.Stop()
	for {
		select {
		case <-memTicker.C:
			p.mutex.Lock()
			// Process is complete
			if p.result != nil {
				p.mutex.Unlock()
				log.WithFields(
					log.Fields{
						"pid":    pid,
						"tag":    p.Tag,
						"jobID":  p.JobID,
						"taskID": p.TaskID,
					}).Info("Finished monitoring memory")
				return
			}
			if _, err := e.pw.GetProcs(); err != nil {
				log.Error(err)
			}
			mem, err := e.getMemUtilization(pid)
			if err != nil {
				p.mutex.Unlock()
				log.Debugf("Error getting memory utilization: %s", err)
				e.stat.Gauge(stats.WorkerMemory).Update(-1)
				continue
			}
			e.stat.Gauge(stats.WorkerMemory).Update(int64(mem))
			// Abort process if calculated memory utilization is above memCap
			if mem >= e.memCap {
				msg := fmt.Sprintf("Cmd exceeded MemoryCap, aborting process %d: %d > %d (%v)", pid, mem, e.memCap, p.cmd.Args)
				p.result = &scootexecer.ProcessStatus{
					State:    scootexecer.COMPLETE,
					Error:    msg,
					ExitCode: 1,
				}
				e.pw.LogProcs(p, log.ErrorLevel, stderr)
				p.mutex.Unlock()
				e.memCapKill(p, mem, memCh)
				return
			}
			// Report on larger changes when utilization is low, and smaller changes as utilization reaches 100%.
			memUsagePct := math.Min(1.0, float64(mem)/float64(e.memCap))
			if memUsagePct > reportThresholds[thresholdsIdx] {
				log.WithFields(
					log.Fields{
						"memUsagePct": int(memUsagePct * 100),
						"mem":         mem,
						"memCap":      e.memCap,
						"args":        p.cmd.Args,
						"pid":         pid,
						"tag":         p.Tag,
						"jobID":       p.JobID,
						"taskID":      p.TaskID,
					}).Infof("Memory utilization increased to %d%%, pid: %d", int(memUsagePct*100), pid)
				e.pw.LogProcs(p, log.DebugLevel, nil)
				for memUsagePct > reportThresholds[thresholdsIdx] {
					thresholdsIdx++
				}
			}
			p.mutex.Unlock()
		}
	}
}