in scheduler/server/stateful_scheduler.go [553:654]
func (s *statefulScheduler) updateStats() {
remainingTasks := 0
waitingTasks := 0
runningTasks := 0
jobsWaitingToStart := 0
requestorsCounts := make(map[string]*requestorCounts) // map of requestor to job and task stats counts
// get job and task counts by requestor, and overall jobs stats
for _, job := range s.inProgressJobs {
requestor := job.Job.Def.Requestor
if _, ok := requestorsCounts[requestor]; !ok {
// first time we've seen this requestor, initialize its map entry
counts := &requestorCounts{}
requestorsCounts[requestor] = counts
}
running := job.GetNumRunning()
completed := job.GetNumCompleted()
remaining := len(job.Tasks) - completed
waiting := remaining - running
status := job.getJobStatus()
// accumulate totals independent of requestors
remainingTasks += remaining
waitingTasks += waiting
runningTasks += running
// totals by requestor
if job.TasksCompleted+job.TasksRunning == 0 {
jobsWaitingToStart += 1
} else if status == domain.InProgress {
requestorsCounts[requestor].numJobsRunning++
}
requestorsCounts[requestor].numRemainingTasks += remaining
requestorsCounts[requestor].numTasksRunning += running
requestorsCounts[requestor].numTasksWaitingToStart += waiting
if time.Since(job.TimeMarker) > LongJobDuration {
job.TimeMarker = time.Now()
log.WithFields(
log.Fields{
"requestor": job.Job.Def.Requestor,
"jobType": job.Job.Def.JobType,
"jobId": job.Job.Id,
"tag": job.Job.Def.Tag,
"basis": job.Job.Def.Basis,
"priority": job.Job.Def.Priority,
"numTasks": len(job.Tasks),
"tasksRunning": job.TasksRunning,
"tasksCompleted": job.TasksCompleted,
"runTime": time.Since(job.TimeCreated),
}).Info("Long-running job")
}
}
// publish the requestor stats
tasksRunningCheckSum := 0
tasksWaitingCheckSum := 0
tasksRemainingCheckSum := 0
for requestor, counts := range requestorsCounts {
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumRunningJobsGauge, requestor)).Update(int64(
counts.numJobsRunning))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedWaitingJobsGauge, requestor)).Update(int64(
counts.numJobsWaitingToStart))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumRunningTasksGauge, requestor)).Update(int64(
counts.numTasksRunning))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedNumWaitingTasksGauge, requestor)).Update(int64(
counts.numTasksWaitingToStart))
s.stat.Gauge(fmt.Sprintf("%s_%s", stats.SchedInProgressTasksGauge, requestor)).Update(int64(
counts.numRemainingTasks))
tasksRunningCheckSum += counts.numTasksRunning
tasksWaitingCheckSum += counts.numTasksWaitingToStart
tasksRemainingCheckSum += counts.numRemainingTasks
}
if tasksRunningCheckSum != runningTasks || tasksWaitingCheckSum != waitingTasks || tasksRemainingCheckSum != remainingTasks {
log.Errorf("stats checksum error\nrunning: expected: %d, got:%d\n waiting: expected: %d, got:%d\nremaining: expected:%d, got:%d",
runningTasks, tasksRunningCheckSum, waitingTasks, tasksWaitingCheckSum, remainingTasks, tasksRemainingCheckSum)
}
// publish the rest of the stats
s.stat.Gauge(stats.SchedAcceptedJobsGauge).Update(int64(len(s.inProgressJobs)))
s.stat.Gauge(stats.SchedWaitingJobsGauge).Update(int64(jobsWaitingToStart))
s.stat.Gauge(stats.SchedInProgressTasksGauge).Update(int64(remainingTasks))
s.stat.Gauge(stats.SchedNumRunningTasksGauge).Update(int64(runningTasks))
s.stat.Gauge(stats.SchedNumWaitingTasksGauge).Update(int64(waitingTasks))
s.stat.Gauge(stats.SchedNumAsyncRunnersGauge).Update(int64(s.asyncRunner.NumRunning())) //TODO remove when done debugging
// print internal data structure sizes
var lbs *LoadBasedAlg = s.config.SchedAlg.(*LoadBasedAlg)
lbsStats := lbs.GetDataStructureSizeStats()
for k, v := range lbsStats {
s.stat.Gauge(k).Update(int64(v))
}
s.stat.Gauge(stats.SchedTaskStartTimeMapSize).Update(int64(s.getSchedTaskStartTimeMapSize()))
s.stat.Gauge(stats.SchedInProgressJobsSize).Update(int64(len(s.inProgressJobs)))
s.stat.Gauge(stats.SchedRequestorMapSize).Update(int64(len(s.requestorMap)))
s.stat.Gauge(stats.SchedRequestorHistorySize).Update(int64(s.requestorHistory.Len()))
s.stat.Gauge(stats.SchedTaskDurationsSize).Update(int64(s.taskDurations.Len()))
s.stat.Gauge(stats.SchedSagasSize).Update(int64(s.sagaCoord.GetNumSagas()))
s.stat.Gauge(stats.SchedRunnersSize).Update(int64(s.asyncRunner.NumRunning()))
}