in pkg/scheduler/plugins/gang/gang.go [160:230]
func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
var unreadyTaskCount int32
var unScheduleJobCount int
for _, job := range ssn.Jobs {
if !job.Ready() {
schedulableTaskNum := func() (num int32) {
for _, task := range job.TaskStatusIndex[api.Pending] {
ctx := task.GetTransactionContext()
if task.LastTransaction != nil {
ctx = *task.LastTransaction
}
if api.AllocatedStatus(ctx.Status) {
num++
}
}
return num + job.ReadyTaskNum()
}
unreadyTaskCount = job.MinAvailable - schedulableTaskNum()
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
unreadyTaskCount, len(job.Tasks), job.FitError())
job.JobFitErrors = msg
unScheduleJobCount++
metrics.RegisterJobRetries(job.Name)
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: v1beta1.NotEnoughResourcesReason,
Message: msg,
}
if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
klog.Errorf("Failed to update job <%s/%s> condition: %v",
job.Namespace, job.Name, err)
}
// allocated task should follow the job fit error
for _, taskInfo := range job.TaskStatusIndex[api.Allocated] {
fitError := job.NodesFitErrors[taskInfo.UID]
if fitError != nil {
continue
}
fitError = api.NewFitErrors()
job.NodesFitErrors[taskInfo.UID] = fitError
fitError.SetError(msg)
}
} else {
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupScheduled,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: "tasks in gang are ready to be scheduled",
Message: "",
}
if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
klog.Errorf("Failed to update job <%s/%s> condition: %v",
job.Namespace, job.Name, err)
}
}
metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount))
unreadyTaskCount = 0
}
metrics.UpdateUnscheduleJobCount(unScheduleJobCount)
}