func()

in pkg/scheduler/plugins/overcommit/overcommit.go [72:140]


func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
	klog.V(4).Infof("Enter overcommit plugin ...")
	defer klog.V(4).Infof("Leaving overcommit plugin.")

	op.pluginArguments.GetFloat64(&op.overCommitFactor, overCommitFactor)
	if op.overCommitFactor < 1.0 {
		klog.Warningf("Invalid input %f for overcommit-factor, reason: overcommit-factor cannot be less than 1,"+
			" using default value: %f.", op.overCommitFactor, defaultOverCommitFactor)
		op.overCommitFactor = defaultOverCommitFactor
	}

	// calculate idle resources of total cluster, overcommit resources included
	total := api.EmptyResource()
	used := api.EmptyResource()
	for _, node := range ssn.Nodes {
		total.Add(node.Allocatable)
		used.Add(node.Used)
	}
	op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used)

	for _, job := range ssn.Jobs {
		// calculate inqueue job resources
		if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil {
			op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
			continue
		}
		// calculate inqueue resource for running jobs
		// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
		// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
		if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
			job.PodGroup.Spec.MinResources != nil &&
			job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
			allocated := util.GetAllocatedResource(job)
			inqueued := util.GetInqueueResource(job, allocated)
			op.inqueueResource.Add(inqueued)
		}
	}

	ssn.AddJobEnqueueableFn(op.Name(), func(obj interface{}) int {
		job := obj.(*api.JobInfo)
		idle := op.idleResource
		inqueue := api.EmptyResource()
		inqueue.Add(op.inqueueResource)
		if job.PodGroup.Spec.MinResources == nil {
			klog.V(4).Infof("Job <%s/%s> is bestEffort, permit to be inqueue.", job.Namespace, job.Name)
			return util.Permit
		}

		//TODO: if allow 1 more job to be inqueue beyond overcommit-factor, large job may be inqueue and create pods
		jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources)
		if inqueue.Add(jobMinReq).LessEqual(idle, api.Zero) {
			klog.V(4).Infof("Sufficient resources, permit job <%s/%s> to be inqueue", job.Namespace, job.Name)
			return util.Permit
		}
		klog.V(4).Infof("Resource in cluster is overused, reject job <%s/%s> to be inqueue",
			job.Namespace, job.Name)
		ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "resource in cluster is overused")
		return util.Reject
	})

	ssn.AddJobEnqueuedFn(op.Name(), func(obj interface{}) {
		job := obj.(*api.JobInfo)
		if job.PodGroup.Spec.MinResources == nil {
			return
		}
		jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources)
		op.inqueueResource.Add(jobMinReq)
	})
}