func()

in scheduler/server/stateful_scheduler.go [553:654]


func (s *statefulScheduler) updateStats() {
	remainingTasks := 0
	waitingTasks := 0
	runningTasks := 0
	jobsWaitingToStart := 0

	requestorsCounts := make(map[string]*requestorCounts) // map of requestor to job and task stats counts

	// get job and task counts by requestor, and overall jobs stats
	for _, job := range s.inProgressJobs {
		requestor := job.Job.Def.Requestor
		if _, ok := requestorsCounts[requestor]; !ok {
			// first time we've seen this requestor, initialize its map entry
			counts := &requestorCounts{}
			requestorsCounts[requestor] = counts
		}

		running := job.GetNumRunning()
		completed := job.GetNumCompleted()
		remaining := len(job.Tasks) - completed
		waiting := remaining - running
		status := job.getJobStatus()
		// accumulate totals independent of requestors
		remainingTasks += remaining
		waitingTasks += waiting
		runningTasks += running

		// totals by requestor
		if job.TasksCompleted+job.TasksRunning == 0 {
			jobsWaitingToStart += 1
		} else if status == domain.InProgress {
			requestorsCounts[requestor].numJobsRunning++
		}
		requestorsCounts[requestor].numRemainingTasks += remaining
		requestorsCounts[requestor].numTasksRunning += running
		requestorsCounts[requestor].numTasksWaitingToStart += waiting

		if time.Since(job.TimeMarker) > LongJobDuration {
			job.TimeMarker = time.Now()
			log.WithFields(
				log.Fields{
					"requestor":      job.Job.Def.Requestor,
					"jobType":        job.Job.Def.JobType,
					"jobId":          job.Job.Id,
					"tag":            job.Job.Def.Tag,
					"basis":          job.Job.Def.Basis,
					"priority":       job.Job.Def.Priority,
					"numTasks":       len(job.Tasks),
					"tasksRunning":   job.TasksRunning,
					"tasksCompleted": job.TasksCompleted,
					"runTime":        time.Since(job.TimeCreated),
				}).Info("Long-running job")
		}
	}

	// publish the requestor stats
	tasksRunningCheckSum := 0
	tasksWaitingCheckSum := 0
	tasksRemainingCheckSum := 0
	for requestor, counts := range requestorsCounts {
		s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumRunningJobsGauge, requestor)).Update(int64(
			counts.numJobsRunning))
		s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedWaitingJobsGauge, requestor)).Update(int64(
			counts.numJobsWaitingToStart))
		s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumRunningTasksGauge, requestor)).Update(int64(
			counts.numTasksRunning))
		s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumWaitingTasksGauge, requestor)).Update(int64(
			counts.numTasksWaitingToStart))
		s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedInProgressTasksGauge, requestor)).Update(int64(
			counts.numRemainingTasks))
		tasksRunningCheckSum += counts.numTasksRunning
		tasksWaitingCheckSum += counts.numTasksWaitingToStart
		tasksRemainingCheckSum += counts.numRemainingTasks
	}

	if tasksRunningCheckSum != runningTasks || tasksWaitingCheckSum != waitingTasks || tasksRemainingCheckSum != remainingTasks {
		log.Errorf("stats checksum error\nrunning: expected: %d, got:%d\n waiting: expected: %d, got:%d\nremaining: expected:%d, got:%d",
			runningTasks, tasksRunningCheckSum, waitingTasks, tasksWaitingCheckSum, remainingTasks, tasksRemainingCheckSum)
	}

	// publish the rest of the stats
	s.stat.Gauge(stats.SchedAcceptedJobsGauge).Update(int64(len(s.inProgressJobs)))
	s.stat.Gauge(stats.SchedWaitingJobsGauge).Update(int64(jobsWaitingToStart))
	s.stat.Gauge(stats.SchedInProgressTasksGauge).Update(int64(remainingTasks))
	s.stat.Gauge(stats.SchedNumRunningTasksGauge).Update(int64(runningTasks))
	s.stat.Gauge(stats.SchedNumWaitingTasksGauge).Update(int64(waitingTasks))
	s.stat.Gauge(stats.SchedNumAsyncRunnersGauge).Update(int64(s.asyncRunner.NumRunning())) //TODO remove when done debugging

	// print internal data structure sizes
	var lbs *LoadBasedAlg = s.config.SchedAlg.(*LoadBasedAlg)
	lbsStats := lbs.GetDataStructureSizeStats()
	for k, v := range lbsStats {
		s.stat.Gauge(k).Update(int64(v))
	}
	s.stat.Gauge(stats.SchedTaskStartTimeMapSize).Update(int64(s.getSchedTaskStartTimeMapSize()))
	s.stat.Gauge(stats.SchedInProgressJobsSize).Update(int64(len(s.inProgressJobs)))
	s.stat.Gauge(stats.SchedRequestorMapSize).Update(int64(len(s.requestorMap)))
	s.stat.Gauge(stats.SchedRequestorHistorySize).Update(int64(s.requestorHistory.Len()))
	s.stat.Gauge(stats.SchedTaskDurationsSize).Update(int64(s.taskDurations.Len()))
	s.stat.Gauge(stats.SchedSagasSize).Update(int64(s.sagaCoord.GetNumSagas()))
	s.stat.Gauge(stats.SchedRunnersSize).Update(int64(s.asyncRunner.NumRunning()))
}