pkg/scheduler/plugins/overcommit/overcommit.go (92 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package overcommit import ( v1 "k8s.io/api/core/v1" "k8s.io/klog" "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/plugins/util" ) const ( // PluginName is name of plugin PluginName = "overcommit" // overCommitFactor is resource overCommit factor for enqueue action // It determines the number of `pending` pods that the scheduler will tolerate // when the resources of the cluster is insufficient overCommitFactor = "overcommit-factor" // defaultOverCommitFactor defines the default overCommit resource factor for enqueue action defaultOverCommitFactor = 1.2 ) type overcommitPlugin struct { // Arguments given for the plugin pluginArguments framework.Arguments idleResource *api.Resource inqueueResource *api.Resource overCommitFactor float64 } // New function returns overcommit plugin object func New(arguments framework.Arguments) framework.Plugin { return &overcommitPlugin{ pluginArguments: arguments, idleResource: api.EmptyResource(), inqueueResource: api.EmptyResource(), overCommitFactor: defaultOverCommitFactor, } } func (op *overcommitPlugin) Name() string { return PluginName } /* User should give overcommit-factor through overcommit plugin arguments as format below: actions: "enqueue, allocate, backfill" tiers: - plugins: - name: overcommit arguments: overcommit-factor: 1.0 */ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("Enter overcommit plugin ...") defer klog.V(4).Infof("Leaving overcommit plugin.") op.pluginArguments.GetFloat64(&op.overCommitFactor, overCommitFactor) if op.overCommitFactor < 1.0 { klog.Warningf("Invalid input %f for overcommit-factor, reason: overcommit-factor cannot be less than 1,"+ " using default value: %f.", op.overCommitFactor, defaultOverCommitFactor) op.overCommitFactor = defaultOverCommitFactor } // calculate idle resources of total cluster, overcommit resources included total := api.EmptyResource() used := api.EmptyResource() for _, node := range ssn.Nodes { total.Add(node.Allocatable) used.Add(node.Used) } op.idleResource = total.Clone().Multi(op.overCommitFactor).Sub(used) for _, job := range ssn.Jobs { // calculate inqueue job resources if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil { op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) continue } // calculate inqueue resource for running jobs // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. if job.PodGroup.Status.Phase == scheduling.PodGroupRunning && job.PodGroup.Spec.MinResources != nil && job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember { allocated := util.GetAllocatedResource(job) inqueued := util.GetInqueueResource(job, allocated) op.inqueueResource.Add(inqueued) } } ssn.AddJobEnqueueableFn(op.Name(), func(obj interface{}) int { job := obj.(*api.JobInfo) idle := op.idleResource inqueue := api.EmptyResource() inqueue.Add(op.inqueueResource) if job.PodGroup.Spec.MinResources == nil { klog.V(4).Infof("Job <%s/%s> is bestEffort, permit to be inqueue.", job.Namespace, job.Name) return util.Permit } //TODO: if allow 1 more job to be inqueue beyond overcommit-factor, large job may be inqueue and create pods jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources) if inqueue.Add(jobMinReq).LessEqual(idle, api.Zero) { klog.V(4).Infof("Sufficient resources, permit job <%s/%s> to be inqueue", job.Namespace, job.Name) return util.Permit } klog.V(4).Infof("Resource in cluster is overused, reject job <%s/%s> to be inqueue", job.Namespace, job.Name) ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "resource in cluster is overused") return util.Reject }) ssn.AddJobEnqueuedFn(op.Name(), func(obj interface{}) { job := obj.(*api.JobInfo) if job.PodGroup.Spec.MinResources == nil { return } jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources) op.inqueueResource.Add(jobMinReq) }) } func (op *overcommitPlugin) OnSessionClose(ssn *framework.Session) { op.idleResource = nil op.inqueueResource = nil }