func()

in pkg/scheduler/plugins/gang/gang.go [160:230]


func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
	var unreadyTaskCount int32
	var unScheduleJobCount int
	for _, job := range ssn.Jobs {
		if !job.Ready() {
			schedulableTaskNum := func() (num int32) {
				for _, task := range job.TaskStatusIndex[api.Pending] {
					ctx := task.GetTransactionContext()
					if task.LastTransaction != nil {
						ctx = *task.LastTransaction
					}
					if api.AllocatedStatus(ctx.Status) {
						num++
					}
				}
				return num + job.ReadyTaskNum()
			}
			unreadyTaskCount = job.MinAvailable - schedulableTaskNum()
			msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
				unreadyTaskCount, len(job.Tasks), job.FitError())
			job.JobFitErrors = msg

			unScheduleJobCount++
			metrics.RegisterJobRetries(job.Name)

			jc := &scheduling.PodGroupCondition{
				Type:               scheduling.PodGroupUnschedulableType,
				Status:             v1.ConditionTrue,
				LastTransitionTime: metav1.Now(),
				TransitionID:       string(ssn.UID),
				Reason:             v1beta1.NotEnoughResourcesReason,
				Message:            msg,
			}

			if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
				klog.Errorf("Failed to update job <%s/%s> condition: %v",
					job.Namespace, job.Name, err)
			}

			// allocated task should follow the job fit error
			for _, taskInfo := range job.TaskStatusIndex[api.Allocated] {
				fitError := job.NodesFitErrors[taskInfo.UID]
				if fitError != nil {
					continue
				}

				fitError = api.NewFitErrors()
				job.NodesFitErrors[taskInfo.UID] = fitError
				fitError.SetError(msg)
			}
		} else {
			jc := &scheduling.PodGroupCondition{
				Type:               scheduling.PodGroupScheduled,
				Status:             v1.ConditionTrue,
				LastTransitionTime: metav1.Now(),
				TransitionID:       string(ssn.UID),
				Reason:             "tasks in gang are ready to be scheduled",
				Message:            "",
			}

			if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
				klog.Errorf("Failed to update job <%s/%s> condition: %v",
					job.Namespace, job.Name, err)
			}
		}
		metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount))
		unreadyTaskCount = 0
	}

	metrics.UpdateUnscheduleJobCount(unScheduleJobCount)
}