in pkg/scheduler/plugins/overcommit/overcommit.go [72:140]
func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("Enter overcommit plugin ...")
defer klog.V(4).Infof("Leaving overcommit plugin.")
op.pluginArguments.GetFloat64(&op.overCommitFactor, overCommitFactor)
if op.overCommitFactor < 1.0 {
klog.Warningf("Invalid input %f for overcommit-factor, reason: overcommit-factor cannot be less than 1,"+
" using default value: %f.", op.overCommitFactor, defaultOverCommitFactor)
op.overCommitFactor = defaultOverCommitFactor
}
// calculate idle resources of total cluster, overcommit resources included
total := api.EmptyResource()
used := api.EmptyResource()
for _, node := range ssn.Nodes {
total.Add(node.Allocatable)
used.Add(node.Used)
}
op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used)
for _, job := range ssn.Jobs {
// calculate inqueue job resources
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil {
op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
continue
}
// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
op.inqueueResource.Add(inqueued)
}
}
ssn.AddJobEnqueueableFn(op.Name(), func(obj interface{}) int {
job := obj.(*api.JobInfo)
idle := op.idleResource
inqueue := api.EmptyResource()
inqueue.Add(op.inqueueResource)
if job.PodGroup.Spec.MinResources == nil {
klog.V(4).Infof("Job <%s/%s> is bestEffort, permit to be inqueue.", job.Namespace, job.Name)
return util.Permit
}
//TODO: if allow 1 more job to be inqueue beyond overcommit-factor, large job may be inqueue and create pods
jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources)
if inqueue.Add(jobMinReq).LessEqual(idle, api.Zero) {
klog.V(4).Infof("Sufficient resources, permit job <%s/%s> to be inqueue", job.Namespace, job.Name)
return util.Permit
}
klog.V(4).Infof("Resource in cluster is overused, reject job <%s/%s> to be inqueue",
job.Namespace, job.Name)
ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "resource in cluster is overused")
return util.Reject
})
ssn.AddJobEnqueuedFn(op.Name(), func(obj interface{}) {
job := obj.(*api.JobInfo)
if job.PodGroup.Spec.MinResources == nil {
return
}
jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources)
op.inqueueResource.Add(jobMinReq)
})
}