in runner/runners/queue.go [78:162]
func NewQueueRunner(
exec execer.Execer,
filerMap runner.RunTypeMap,
output runner.OutputCreator,
capacity int,
stat stats.StatsReceiver,
dirMonitor *stats.DirsMonitor,
rID runner.RunnerID,
preprocessors []func() error,
postprocessors []func() error,
uploader LogUploader,
) runner.Service {
if stat == nil {
stat = stats.NilStatsReceiver()
}
//FIXME(jschiller): proper history config rather than keying off of capacity and if this is a SingleRunner.
history := 1
if capacity > 0 {
history = 0 // unlimited if acting as a queue (vs single runner).
} else if capacity == 0 {
capacity = 1 // singleRunner, override capacity so it can actually run a command.
}
statusManager := NewStatusManager(history)
inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID, preprocessors, postprocessors, uploader)
controller := &QueueController{
statusManager: statusManager,
inv: inv,
filerMap: filerMap,
updateReq: make(map[runner.RunType]bool),
capacity: capacity,
reqCh: make(chan interface{}),
updateCh: make(chan interface{}),
cancelTimerCh: make(chan interface{}),
}
run := &Service{controller, statusManager}
// QueueRunner waits on filers with InitDoneChannels defined to return,
// and will not serve requests if any return an error
var wg sync.WaitGroup
var initErr error = nil
wait := false
for t, f := range filerMap {
if f.IDC != nil {
log.Infof("Starting goroutine to wait on init for filer %v", t)
wg.Add(1)
wait = true
go func(rt runner.RunType, idc snapshot.InitDoneCh) {
err := <-idc
if err != nil {
initErr = err
log.Errorf("Init channel for filer %v returned error: %s", rt, err)
}
wg.Done()
}(t, f.IDC)
}
}
if wait {
// Wait for initialization to complete in a new goroutine to let this constructor return
go func() {
wg.Wait()
if initErr != nil {
stat.Counter(stats.WorkerDownloadInitFailure).Inc(1)
statusManager.UpdateService(runner.ServiceStatus{Initialized: false, Error: initErr})
} else {
statusManager.UpdateService(runner.ServiceStatus{Initialized: true, IsHealthy: true})
stat.Gauge(stats.WorkerUnhealthy).Update(0)
controller.startUpdateTickers()
}
}()
} else {
statusManager.UpdateService(runner.ServiceStatus{Initialized: true, IsHealthy: true})
stat.Gauge(stats.WorkerUnhealthy).Update(0)
controller.startUpdateTickers()
}
go controller.loop()
return run
}