in runner/runners/queue.go [342:444]
func (c *QueueController) loop() {
var watchCh chan runner.RunStatus
var updateDoneCh chan interface{}
updateRequested := false
var idleLatency = c.inv.stat.Latency(stats.WorkerIdleLatency_ms)
idleLatency.Time()
tryUpdate := func() {
if watchCh == nil && updateDoneCh == nil {
updateRequested = false
updateDoneCh = make(chan interface{})
go func() {
// Record filers by RunType that have requested updates
c.updateLock.Lock()
typesToUpdate := []runner.RunType{}
for t, u := range c.updateReq {
if u {
typesToUpdate = append(typesToUpdate, t)
c.updateReq[t] = false
}
}
c.updateLock.Unlock()
// Run all requested filer updates serially
for _, t := range typesToUpdate {
log.Infof("Running filer update for type %v", t)
if err := c.filerMap[t].Filer.Update(); err != nil {
fields := log.Fields{"err": err}
if c.runningCmd != nil {
fields["runType"] = t
fields["tag"] = c.runningCmd.Tag
fields["jobID"] = c.runningCmd.JobID
fields["taskID"] = c.runningCmd.TaskID
}
log.WithFields(fields).Error("Error running Filer Update")
}
}
updateDoneCh <- nil
}()
} else {
updateRequested = true
}
}
tryRun := func() {
if watchCh == nil && updateDoneCh == nil && len(c.queue) > 0 && (c.statusManager.svcStatus.IsHealthy) {
cmdID := c.queue[0]
watchCh = c.runAndWatch(cmdID)
idleLatency.Stop()
}
}
for c.reqCh != nil {
// Prefer to run an update first if we have one scheduled. If not updating, try running.
if updateRequested {
tryUpdate()
} else {
select {
case <-c.updateCh:
tryUpdate()
default:
tryRun()
}
}
// Wait on update, updateDone, run start, or run abort.
select {
case <-c.updateCh:
tryUpdate()
case <-updateDoneCh:
// Give run() a chance right after we've finished updating.
updateDoneCh = nil
tryRun()
case req, ok := <-c.reqCh:
// Handle run and abort requests.
if !ok {
c.reqCh = nil
close(c.cancelTimerCh)
return
}
switch r := req.(type) {
case runReq:
st, err := c.enqueue(r.cmd)
r.resultCh <- result{st, err}
case abortReq:
st, err := c.abort(r.runID)
r.resultCh <- result{st, err}
}
case <-watchCh:
// Handle finished run by resetting state.
watchCh = nil
c.runningID = ""
c.runningCmd = nil
c.runningAbort = nil
c.queue = c.queue[1:]
idleLatency.Time()
}
}
}