pkg/scheduler/plugins/drf/drf.go (500 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package drf import ( "fmt" "math" "strconv" "strings" v1 "k8s.io/api/core/v1" "k8s.io/klog" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/helpers" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/metrics" "volcano.sh/volcano/pkg/scheduler/plugins/util" ) // PluginName indicates name of volcano scheduler plugin. const PluginName = "drf" var shareDelta = 0.000001 // hierarchicalNode represents the node hierarchy // and the corresponding weight and drf attribute type hierarchicalNode struct { parent *hierarchicalNode attr *drfAttr // If the node is a leaf node, // request represents the request of the job. request *api.Resource weight float64 saturated bool hierarchy string children map[string]*hierarchicalNode } func (node *hierarchicalNode) Clone(parent *hierarchicalNode) *hierarchicalNode { newNode := &hierarchicalNode{ parent: parent, attr: &drfAttr{ share: node.attr.share, dominantResource: node.attr.dominantResource, allocated: node.attr.allocated.Clone(), }, request: node.request.Clone(), weight: node.weight, saturated: node.saturated, hierarchy: node.hierarchy, children: nil, } if node.children != nil { newNode.children = map[string]*hierarchicalNode{} for _, child := range node.children { newNode.children[child.hierarchy] = child.Clone(newNode) } } return newNode } // resourceSaturated returns true if any resource of the job is saturated or the job demands fully allocated resource func resourceSaturated(allocated *api.Resource, jobRequest *api.Resource, demandingResources map[v1.ResourceName]bool) bool { for _, rn := range allocated.ResourceNames() { if allocated.Get(rn) != 0 && jobRequest.Get(rn) != 0 && allocated.Get(rn) >= jobRequest.Get(rn) { return true } if !demandingResources[rn] && jobRequest.Get(rn) != 0 { return true } } return false } type drfAttr struct { share float64 dominantResource string allocated *api.Resource } func (attr *drfAttr) String() string { return fmt.Sprintf("dominant resource <%s>, dominant share %f, allocated %s", attr.dominantResource, attr.share, attr.allocated) } type drfPlugin struct { totalResource *api.Resource totalAllocated *api.Resource // Key is Job ID jobAttrs map[api.JobID]*drfAttr // map[namespaceName]->attr namespaceOpts map[string]*drfAttr // hierarchical tree root hierarchicalRoot *hierarchicalNode // Arguments given for the plugin pluginArguments framework.Arguments } // New return drf plugin func New(arguments framework.Arguments) framework.Plugin { return &drfPlugin{ totalResource: api.EmptyResource(), totalAllocated: api.EmptyResource(), jobAttrs: map[api.JobID]*drfAttr{}, namespaceOpts: map[string]*drfAttr{}, hierarchicalRoot: &hierarchicalNode{ attr: &drfAttr{allocated: api.EmptyResource()}, request: api.EmptyResource(), hierarchy: "root", weight: 1, children: map[string]*hierarchicalNode{}, }, pluginArguments: arguments, } } func (drf *drfPlugin) Name() string { return PluginName } // HierarchyEnabled returns if hierarchy is enabled func (drf *drfPlugin) HierarchyEnabled(ssn *framework.Session) bool { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { if plugin.Name != PluginName { continue } return plugin.EnabledHierarchy != nil && *plugin.EnabledHierarchy } } return false } // NamespaceOrderEnabled returns the NamespaceOrder for this plugin is enabled in this session or not func (drf *drfPlugin) NamespaceOrderEnabled(ssn *framework.Session) bool { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { if plugin.Name != PluginName { continue } return plugin.EnabledNamespaceOrder != nil && *plugin.EnabledNamespaceOrder } } return false } func (drf *drfPlugin) compareQueues(root *hierarchicalNode, lqueue *api.QueueInfo, rqueue *api.QueueInfo) float64 { lnode := root lpaths := strings.Split(lqueue.Hierarchy, "/") rnode := root rpaths := strings.Split(rqueue.Hierarchy, "/") depth := 0 if len(lpaths) < len(rpaths) { depth = len(lpaths) } else { depth = len(rpaths) } for i := 0; i < depth; i++ { // Saturated nodes have minumun prioirty, // so that demanding nodes will be poped first. if !lnode.saturated && rnode.saturated { return -1 } if lnode.saturated && !rnode.saturated { return 1 } if lnode.attr.share/lnode.weight == rnode.attr.share/rnode.weight { if i < depth-1 { lnode = lnode.children[lpaths[i+1]] rnode = rnode.children[rpaths[i+1]] } } else { return lnode.attr.share/lnode.weight - rnode.attr.share/rnode.weight } } return 0 } func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { // Prepare scheduling data for this session. drf.totalResource.Add(ssn.TotalResource) klog.V(4).Infof("Total Allocatable %s", drf.totalResource) namespaceOrderEnabled := drf.NamespaceOrderEnabled(ssn) hierarchyEnabled := drf.HierarchyEnabled(ssn) for _, job := range ssn.Jobs { attr := &drfAttr{ allocated: api.EmptyResource(), } for status, tasks := range job.TaskStatusIndex { if api.AllocatedStatus(status) { for _, t := range tasks { attr.allocated.Add(t.Resreq) } } } // Calculate the init share of Job drf.updateJobShare(job.Namespace, job.Name, attr) drf.jobAttrs[job.UID] = attr if namespaceOrderEnabled { nsOpts, found := drf.namespaceOpts[job.Namespace] if !found { nsOpts = &drfAttr{ allocated: api.EmptyResource(), } drf.namespaceOpts[job.Namespace] = nsOpts } // all task in job should have the same namespace with job nsOpts.allocated.Add(attr.allocated) drf.updateNamespaceShare(job.Namespace, nsOpts) } if hierarchyEnabled { queue := ssn.Queues[job.Queue] drf.totalAllocated.Add(attr.allocated) drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights) } } preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) { var victims []*api.TaskInfo addVictim := func(candidate *api.TaskInfo) { victims = append(victims, candidate) } if namespaceOrderEnabled { // apply the namespace share policy on preemptee firstly lWeight := ssn.NamespaceInfo[api.NamespaceName(preemptor.Namespace)].GetWeight() lNsAtt := drf.namespaceOpts[preemptor.Namespace] lNsAlloc := lNsAtt.allocated.Clone().Add(preemptor.Resreq) _, lNsShare := drf.calculateShare(lNsAlloc, drf.totalResource) lNsShareWeighted := lNsShare / float64(lWeight) namespaceAllocation := map[string]*api.Resource{} // undecidedPreemptees means this policy could not judge preemptee is preemptable or not // and left it to next policy undecidedPreemptees := []*api.TaskInfo{} for _, preemptee := range preemptees { if preemptor.Namespace == preemptee.Namespace { // policy is disabled when they are in the same namespace undecidedPreemptees = append(undecidedPreemptees, preemptee) continue } // compute the preemptee namespace weighted share after preemption nsAllocation, found := namespaceAllocation[preemptee.Namespace] if !found { rNsAtt := drf.namespaceOpts[preemptee.Namespace] nsAllocation = rNsAtt.allocated.Clone() namespaceAllocation[preemptee.Namespace] = nsAllocation } rWeight := ssn.NamespaceInfo[api.NamespaceName(preemptee.Namespace)].GetWeight() rNsAlloc := nsAllocation.Sub(preemptee.Resreq) _, rNsShare := drf.calculateShare(rNsAlloc, drf.totalResource) rNsShareWeighted := rNsShare / float64(rWeight) // to avoid ping pong actions, the preemptee namespace should // have the higher weighted share after preemption. if lNsShareWeighted < rNsShareWeighted { addVictim(preemptee) continue } if lNsShareWeighted-rNsShareWeighted > shareDelta { continue } // equal namespace order leads to judgement of jobOrder undecidedPreemptees = append(undecidedPreemptees, preemptee) } preemptees = undecidedPreemptees } latt := drf.jobAttrs[preemptor.Job] lalloc := latt.allocated.Clone().Add(preemptor.Resreq) _, ls := drf.calculateShare(lalloc, drf.totalResource) allocations := map[api.JobID]*api.Resource{} for _, preemptee := range preemptees { if _, found := allocations[preemptee.Job]; !found { ratt := drf.jobAttrs[preemptee.Job] allocations[preemptee.Job] = ratt.allocated.Clone() } ralloc := allocations[preemptee.Job].Sub(preemptee.Resreq) _, rs := drf.calculateShare(ralloc, drf.totalResource) if ls < rs || math.Abs(ls-rs) <= shareDelta { addVictim(preemptee) } } klog.V(4).Infof("Victims from DRF plugins are %+v", victims) return victims, util.Permit } ssn.AddPreemptableFn(drf.Name(), preemptableFn) if hierarchyEnabled { queueOrderFn := func(l interface{}, r interface{}) int { lv := l.(*api.QueueInfo) rv := r.(*api.QueueInfo) ret := drf.compareQueues(drf.hierarchicalRoot, lv, rv) if ret < 0 { return -1 } if ret > 0 { return 1 } return 0 } ssn.AddQueueOrderFn(drf.Name(), queueOrderFn) reclaimFn := func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) ([]*api.TaskInfo, int) { var victims []*api.TaskInfo // clone hdrf tree totalAllocated := drf.totalAllocated.Clone() root := drf.hierarchicalRoot.Clone(nil) // update reclaimer hdrf ljob := ssn.Jobs[reclaimer.Job] lqueue := ssn.Queues[ljob.Queue] ljob = ljob.Clone() attr := drf.jobAttrs[ljob.UID] lattr := &drfAttr{ allocated: attr.allocated.Clone(), } lattr.allocated.Add(reclaimer.Resreq) totalAllocated.Add(reclaimer.Resreq) drf.updateShare(lattr) drf.UpdateHierarchicalShare(root, totalAllocated, ljob, lattr, lqueue.Hierarchy, lqueue.Weights) for _, preemptee := range reclaimees { rjob := ssn.Jobs[preemptee.Job] rqueue := ssn.Queues[rjob.Queue] // update hdrf of reclaimee job totalAllocated.Sub(preemptee.Resreq) rjob = rjob.Clone() attr := drf.jobAttrs[rjob.UID] rattr := &drfAttr{ allocated: attr.allocated.Clone(), } rattr.allocated.Sub(preemptee.Resreq) drf.updateShare(rattr) drf.UpdateHierarchicalShare(root, totalAllocated, rjob, rattr, rqueue.Hierarchy, rqueue.Weights) // compare hdrf of queues ret := drf.compareQueues(root, lqueue, rqueue) // resume hdrf of reclaimee job totalAllocated.Add(preemptee.Resreq) rattr.allocated.Add(preemptee.Resreq) drf.updateShare(rattr) drf.UpdateHierarchicalShare(root, totalAllocated, rjob, rattr, rqueue.Hierarchy, rqueue.Weights) if ret < 0 { victims = append(victims, preemptee) } if ret > shareDelta { continue } } klog.V(4).Infof("Victims from HDRF plugins are %+v", victims) return victims, util.Permit } ssn.AddReclaimableFn(drf.Name(), reclaimFn) } jobOrderFn := func(l interface{}, r interface{}) int { lv := l.(*api.JobInfo) rv := r.(*api.JobInfo) klog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %v, <%v/%v> share state: %v", lv.Namespace, lv.Name, drf.jobAttrs[lv.UID].share, rv.Namespace, rv.Name, drf.jobAttrs[rv.UID].share) if drf.jobAttrs[lv.UID].share == drf.jobAttrs[rv.UID].share { return 0 } if drf.jobAttrs[lv.UID].share < drf.jobAttrs[rv.UID].share { return -1 } return 1 } ssn.AddJobOrderFn(drf.Name(), jobOrderFn) namespaceOrderFn := func(l interface{}, r interface{}) int { lv := l.(api.NamespaceName) rv := r.(api.NamespaceName) lOpt := drf.namespaceOpts[string(lv)] rOpt := drf.namespaceOpts[string(rv)] lWeight := ssn.NamespaceInfo[lv].GetWeight() rWeight := ssn.NamespaceInfo[rv].GetWeight() klog.V(4).Infof("DRF NamespaceOrderFn: <%v> share state: %f, weight %v, <%v> share state: %f, weight %v", lv, lOpt.share, lWeight, rv, rOpt.share, rWeight) lWeightedShare := lOpt.share / float64(lWeight) rWeightedShare := rOpt.share / float64(rWeight) metrics.UpdateNamespaceWeight(string(lv), lWeight) metrics.UpdateNamespaceWeight(string(rv), rWeight) metrics.UpdateNamespaceWeightedShare(string(lv), lWeightedShare) metrics.UpdateNamespaceWeightedShare(string(rv), rWeightedShare) if lWeightedShare == rWeightedShare { return 0 } if lWeightedShare < rWeightedShare { return -1 } return 1 } if namespaceOrderEnabled { ssn.AddNamespaceOrderFn(drf.Name(), namespaceOrderFn) } // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { attr := drf.jobAttrs[event.Task.Job] attr.allocated.Add(event.Task.Resreq) job := ssn.Jobs[event.Task.Job] drf.updateJobShare(job.Namespace, job.Name, attr) nsShare := -1.0 if namespaceOrderEnabled { nsOpt := drf.namespaceOpts[event.Task.Namespace] nsOpt.allocated.Add(event.Task.Resreq) drf.updateNamespaceShare(event.Task.Namespace, nsOpt) nsShare = nsOpt.share } if hierarchyEnabled { queue := ssn.Queues[job.Queue] drf.totalAllocated.Add(event.Task.Resreq) drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights) } klog.V(4).Infof("DRF AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>, namespace share <%v>", event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare) }, DeallocateFunc: func(event *framework.Event) { attr := drf.jobAttrs[event.Task.Job] attr.allocated.Sub(event.Task.Resreq) job := ssn.Jobs[event.Task.Job] drf.updateJobShare(job.Namespace, job.Name, attr) nsShare := -1.0 if namespaceOrderEnabled { nsOpt := drf.namespaceOpts[event.Task.Namespace] nsOpt.allocated.Sub(event.Task.Resreq) drf.updateNamespaceShare(event.Task.Namespace, nsOpt) nsShare = nsOpt.share } if hierarchyEnabled { queue := ssn.Queues[job.Queue] drf.totalAllocated.Sub(event.Task.Resreq) drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights) } klog.V(4).Infof("DRF EvictFunc: task <%v/%v>, resreq <%v>, share <%v>, namespace share <%v>", event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare) }, }) } func (drf *drfPlugin) updateNamespaceShare(namespaceName string, attr *drfAttr) { drf.updateShare(attr) metrics.UpdateNamespaceShare(namespaceName, attr.share) } // build hierarchy if the node does not exist func (drf *drfPlugin) buildHierarchy(root *hierarchicalNode, job *api.JobInfo, attr *drfAttr, hierarchy, hierarchicalWeights string) { inode := root paths := strings.Split(hierarchy, "/") weights := strings.Split(hierarchicalWeights, "/") for i := 1; i < len(paths); i++ { if child, ok := inode.children[paths[i]]; ok { inode = child } else { fweight, _ := strconv.ParseFloat(weights[i], 64) if fweight < 1 { fweight = 1 } child = &hierarchicalNode{ weight: fweight, hierarchy: paths[i], request: api.EmptyResource(), attr: &drfAttr{ allocated: api.EmptyResource(), }, children: make(map[string]*hierarchicalNode), } klog.V(4).Infof("Node %s added to %s, weight %f", child.hierarchy, inode.hierarchy, fweight) inode.children[paths[i]] = child child.parent = inode inode = child } } child := &hierarchicalNode{ weight: 1, attr: attr, hierarchy: string(job.UID), request: job.TotalRequest.Clone(), children: nil, } inode.children[string(job.UID)] = child // update drf attribute bottom up klog.V(4).Infof("Job <%s/%s> added to %s, weights %s, attr %v, total request: %s", job.Namespace, job.Name, inode.hierarchy, hierarchicalWeights, child.attr, job.TotalRequest) } // updateNamespaceShare updates the node attribute recursively func (drf *drfPlugin) updateHierarchicalShare(node *hierarchicalNode, demandingResources map[v1.ResourceName]bool) { if node.children == nil { node.saturated = resourceSaturated(node.attr.allocated, node.request, demandingResources) klog.V(4).Infof("Update hierarchical node %s, share %f, dominant %s, resource %v, saturated: %t", node.hierarchy, node.attr.share, node.attr.dominantResource, node.attr.allocated, node.saturated) } else { var mdr float64 = 1 // get minimun dominant resource share for _, child := range node.children { drf.updateHierarchicalShare(child, demandingResources) // skip empty child and saturated child if child.attr.share != 0 && !child.saturated { _, resShare := drf.calculateShare(child.attr.allocated, drf.totalResource) if resShare < mdr { mdr = resShare } } } node.attr.allocated = api.EmptyResource() saturated := true for _, child := range node.children { if !child.saturated { saturated = false } // only consider non-empty children if child.attr.share != 0 { // saturated child is not scaled if child.saturated { t := child.attr.allocated node.attr.allocated.Add(t) } else { t := child.attr.allocated.Clone().Multi(mdr / child.attr.share) node.attr.allocated.Add(t) } } } node.attr.dominantResource, node.attr.share = drf.calculateShare( node.attr.allocated, drf.totalResource) node.saturated = saturated klog.V(4).Infof("Update hierarchical node %s, share %f, dominant resource %s, resource %v, saturated: %t", node.hierarchy, node.attr.share, node.attr.dominantResource, node.attr.allocated, node.saturated) } } func (drf *drfPlugin) UpdateHierarchicalShare(root *hierarchicalNode, totalAllocated *api.Resource, job *api.JobInfo, attr *drfAttr, hierarchy, hierarchicalWeights string) { // filter out demanding resources demandingResources := map[v1.ResourceName]bool{} for _, rn := range drf.totalResource.ResourceNames() { if totalAllocated.Get(rn) < drf.totalResource.Get(rn) { demandingResources[rn] = true } } drf.buildHierarchy(root, job, attr, hierarchy, hierarchicalWeights) drf.updateHierarchicalShare(root, demandingResources) } func (drf *drfPlugin) updateJobShare(jobNs, jobName string, attr *drfAttr) { drf.updateShare(attr) metrics.UpdateJobShare(jobNs, jobName, attr.share) } func (drf *drfPlugin) updateShare(attr *drfAttr) { attr.dominantResource, attr.share = drf.calculateShare(attr.allocated, drf.totalResource) } func (drf *drfPlugin) calculateShare(allocated, totalResource *api.Resource) (string, float64) { res := float64(0) dominantResource := "" for _, rn := range totalResource.ResourceNames() { share := helpers.Share(allocated.Get(rn), totalResource.Get(rn)) if share > res { res = share dominantResource = string(rn) } } return dominantResource, res } func (drf *drfPlugin) OnSessionClose(session *framework.Session) { // Clean schedule data. drf.totalResource = api.EmptyResource() drf.totalAllocated = api.EmptyResource() drf.jobAttrs = map[api.JobID]*drfAttr{} }