func()

in runner/runners/queue.go [342:444]


func (c *QueueController) loop() {
	var watchCh chan runner.RunStatus
	var updateDoneCh chan interface{}
	updateRequested := false

	var idleLatency = c.inv.stat.Latency(stats.WorkerIdleLatency_ms)
	idleLatency.Time()

	tryUpdate := func() {
		if watchCh == nil && updateDoneCh == nil {
			updateRequested = false
			updateDoneCh = make(chan interface{})
			go func() {
				// Record filers by RunType that have requested updates
				c.updateLock.Lock()
				typesToUpdate := []runner.RunType{}
				for t, u := range c.updateReq {
					if u {
						typesToUpdate = append(typesToUpdate, t)
						c.updateReq[t] = false
					}
				}
				c.updateLock.Unlock()

				// Run all requested filer updates serially
				for _, t := range typesToUpdate {
					log.Infof("Running filer update for type %v", t)
					if err := c.filerMap[t].Filer.Update(); err != nil {
						fields := log.Fields{"err": err}
						if c.runningCmd != nil {
							fields["runType"] = t
							fields["tag"] = c.runningCmd.Tag
							fields["jobID"] = c.runningCmd.JobID
							fields["taskID"] = c.runningCmd.TaskID
						}
						log.WithFields(fields).Error("Error running Filer Update")
					}
				}
				updateDoneCh <- nil
			}()
		} else {
			updateRequested = true
		}
	}

	tryRun := func() {
		if watchCh == nil && updateDoneCh == nil && len(c.queue) > 0 && (c.statusManager.svcStatus.IsHealthy) {
			cmdID := c.queue[0]
			watchCh = c.runAndWatch(cmdID)
			idleLatency.Stop()
		}
	}

	for c.reqCh != nil {
		// Prefer to run an update first if we have one scheduled. If not updating, try running.
		if updateRequested {
			tryUpdate()
		} else {
			select {
			case <-c.updateCh:
				tryUpdate()
			default:
				tryRun()
			}
		}

		// Wait on update, updateDone, run start, or run abort.
		select {
		case <-c.updateCh:
			tryUpdate()

		case <-updateDoneCh:
			// Give run() a chance right after we've finished updating.
			updateDoneCh = nil
			tryRun()

		case req, ok := <-c.reqCh:
			// Handle run and abort requests.
			if !ok {
				c.reqCh = nil
				close(c.cancelTimerCh)
				return
			}
			switch r := req.(type) {
			case runReq:
				st, err := c.enqueue(r.cmd)
				r.resultCh <- result{st, err}
			case abortReq:
				st, err := c.abort(r.runID)
				r.resultCh <- result{st, err}
			}

		case <-watchCh:
			// Handle finished run by resetting state.
			watchCh = nil
			c.runningID = ""
			c.runningCmd = nil
			c.runningAbort = nil
			c.queue = c.queue[1:]
			idleLatency.Time()
		}
	}
}