func NewQueueRunner()

in runner/runners/queue.go [78:162]


func NewQueueRunner(
	exec execer.Execer,
	filerMap runner.RunTypeMap,
	output runner.OutputCreator,
	capacity int,
	stat stats.StatsReceiver,
	dirMonitor *stats.DirsMonitor,
	rID runner.RunnerID,
	preprocessors []func() error,
	postprocessors []func() error,
	uploader LogUploader,
) runner.Service {
	if stat == nil {
		stat = stats.NilStatsReceiver()
	}

	//FIXME(jschiller): proper history config rather than keying off of capacity and if this is a SingleRunner.
	history := 1
	if capacity > 0 {
		history = 0 // unlimited if acting as a queue (vs single runner).
	} else if capacity == 0 {
		capacity = 1 // singleRunner, override capacity so it can actually run a command.
	}

	statusManager := NewStatusManager(history)
	inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID, preprocessors, postprocessors, uploader)

	controller := &QueueController{
		statusManager: statusManager,
		inv:           inv,
		filerMap:      filerMap,
		updateReq:     make(map[runner.RunType]bool),
		capacity:      capacity,
		reqCh:         make(chan interface{}),
		updateCh:      make(chan interface{}),
		cancelTimerCh: make(chan interface{}),
	}
	run := &Service{controller, statusManager}

	// QueueRunner waits on filers with InitDoneChannels defined to return,
	// and will not serve requests if any return an error
	var wg sync.WaitGroup
	var initErr error = nil
	wait := false

	for t, f := range filerMap {
		if f.IDC != nil {
			log.Infof("Starting goroutine to wait on init for filer %v", t)
			wg.Add(1)
			wait = true

			go func(rt runner.RunType, idc snapshot.InitDoneCh) {
				err := <-idc
				if err != nil {
					initErr = err
					log.Errorf("Init channel for filer %v returned error: %s", rt, err)
				}
				wg.Done()
			}(t, f.IDC)
		}
	}

	if wait {
		// Wait for initialization to complete in a new goroutine to let this constructor return
		go func() {
			wg.Wait()
			if initErr != nil {
				stat.Counter(stats.WorkerDownloadInitFailure).Inc(1)
				statusManager.UpdateService(runner.ServiceStatus{Initialized: false, Error: initErr})
			} else {
				statusManager.UpdateService(runner.ServiceStatus{Initialized: true, IsHealthy: true})
				stat.Gauge(stats.WorkerUnhealthy).Update(0)
				controller.startUpdateTickers()
			}
		}()
	} else {
		statusManager.UpdateService(runner.ServiceStatus{Initialized: true, IsHealthy: true})
		stat.Gauge(stats.WorkerUnhealthy).Update(0)
		controller.startUpdateTickers()
	}

	go controller.loop()

	return run
}