func()

in pkg/scheduler/plugins/drf/drf.go [200:513]


func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
	// Prepare scheduling data for this session.
	drf.totalResource.Add(ssn.TotalResource)

	klog.V(4).Infof("Total Allocatable %s", drf.totalResource)

	namespaceOrderEnabled := drf.NamespaceOrderEnabled(ssn)
	hierarchyEnabled := drf.HierarchyEnabled(ssn)

	for _, job := range ssn.Jobs {
		attr := &drfAttr{
			allocated: api.EmptyResource(),
		}

		for status, tasks := range job.TaskStatusIndex {
			if api.AllocatedStatus(status) {
				for _, t := range tasks {
					attr.allocated.Add(t.Resreq)
				}
			}
		}

		// Calculate the init share of Job
		drf.updateJobShare(job.Namespace, job.Name, attr)

		drf.jobAttrs[job.UID] = attr

		if namespaceOrderEnabled {
			nsOpts, found := drf.namespaceOpts[job.Namespace]
			if !found {
				nsOpts = &drfAttr{
					allocated: api.EmptyResource(),
				}
				drf.namespaceOpts[job.Namespace] = nsOpts
			}
			// all task in job should have the same namespace with job
			nsOpts.allocated.Add(attr.allocated)
			drf.updateNamespaceShare(job.Namespace, nsOpts)
		}
		if hierarchyEnabled {
			queue := ssn.Queues[job.Queue]
			drf.totalAllocated.Add(attr.allocated)
			drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights)
		}
	}

	preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
		var victims []*api.TaskInfo

		addVictim := func(candidate *api.TaskInfo) {
			victims = append(victims, candidate)
		}

		if namespaceOrderEnabled {
			// apply the namespace share policy on preemptee firstly

			lWeight := ssn.NamespaceInfo[api.NamespaceName(preemptor.Namespace)].GetWeight()
			lNsAtt := drf.namespaceOpts[preemptor.Namespace]
			lNsAlloc := lNsAtt.allocated.Clone().Add(preemptor.Resreq)
			_, lNsShare := drf.calculateShare(lNsAlloc, drf.totalResource)
			lNsShareWeighted := lNsShare / float64(lWeight)

			namespaceAllocation := map[string]*api.Resource{}

			// undecidedPreemptees means this policy could not judge preemptee is preemptable or not
			// and left it to next policy
			undecidedPreemptees := []*api.TaskInfo{}

			for _, preemptee := range preemptees {
				if preemptor.Namespace == preemptee.Namespace {
					// policy is disabled when they are in the same namespace
					undecidedPreemptees = append(undecidedPreemptees, preemptee)
					continue
				}

				// compute the preemptee namespace weighted share after preemption
				nsAllocation, found := namespaceAllocation[preemptee.Namespace]
				if !found {
					rNsAtt := drf.namespaceOpts[preemptee.Namespace]
					nsAllocation = rNsAtt.allocated.Clone()
					namespaceAllocation[preemptee.Namespace] = nsAllocation
				}
				rWeight := ssn.NamespaceInfo[api.NamespaceName(preemptee.Namespace)].GetWeight()
				rNsAlloc := nsAllocation.Sub(preemptee.Resreq)
				_, rNsShare := drf.calculateShare(rNsAlloc, drf.totalResource)
				rNsShareWeighted := rNsShare / float64(rWeight)

				// to avoid ping pong actions, the preemptee namespace should
				// have the higher weighted share after preemption.
				if lNsShareWeighted < rNsShareWeighted {
					addVictim(preemptee)
					continue
				}
				if lNsShareWeighted-rNsShareWeighted > shareDelta {
					continue
				}

				// equal namespace order leads to judgement of jobOrder
				undecidedPreemptees = append(undecidedPreemptees, preemptee)
			}

			preemptees = undecidedPreemptees
		}

		latt := drf.jobAttrs[preemptor.Job]
		lalloc := latt.allocated.Clone().Add(preemptor.Resreq)
		_, ls := drf.calculateShare(lalloc, drf.totalResource)

		allocations := map[api.JobID]*api.Resource{}

		for _, preemptee := range preemptees {
			if _, found := allocations[preemptee.Job]; !found {
				ratt := drf.jobAttrs[preemptee.Job]
				allocations[preemptee.Job] = ratt.allocated.Clone()
			}
			ralloc := allocations[preemptee.Job].Sub(preemptee.Resreq)
			_, rs := drf.calculateShare(ralloc, drf.totalResource)

			if ls < rs || math.Abs(ls-rs) <= shareDelta {
				addVictim(preemptee)
			}
		}

		klog.V(4).Infof("Victims from DRF plugins are %+v", victims)

		return victims, util.Permit
	}

	ssn.AddPreemptableFn(drf.Name(), preemptableFn)

	if hierarchyEnabled {
		queueOrderFn := func(l interface{}, r interface{}) int {
			lv := l.(*api.QueueInfo)
			rv := r.(*api.QueueInfo)
			ret := drf.compareQueues(drf.hierarchicalRoot, lv, rv)
			if ret < 0 {
				return -1
			}
			if ret > 0 {
				return 1
			}
			return 0
		}
		ssn.AddQueueOrderFn(drf.Name(), queueOrderFn)

		reclaimFn := func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) ([]*api.TaskInfo, int) {
			var victims []*api.TaskInfo
			// clone hdrf tree
			totalAllocated := drf.totalAllocated.Clone()
			root := drf.hierarchicalRoot.Clone(nil)

			//  update reclaimer hdrf
			ljob := ssn.Jobs[reclaimer.Job]
			lqueue := ssn.Queues[ljob.Queue]
			ljob = ljob.Clone()
			attr := drf.jobAttrs[ljob.UID]
			lattr := &drfAttr{
				allocated: attr.allocated.Clone(),
			}
			lattr.allocated.Add(reclaimer.Resreq)
			totalAllocated.Add(reclaimer.Resreq)
			drf.updateShare(lattr)
			drf.UpdateHierarchicalShare(root, totalAllocated, ljob, lattr, lqueue.Hierarchy, lqueue.Weights)

			for _, preemptee := range reclaimees {
				rjob := ssn.Jobs[preemptee.Job]
				rqueue := ssn.Queues[rjob.Queue]

				// update hdrf of reclaimee job
				totalAllocated.Sub(preemptee.Resreq)
				rjob = rjob.Clone()
				attr := drf.jobAttrs[rjob.UID]
				rattr := &drfAttr{
					allocated: attr.allocated.Clone(),
				}
				rattr.allocated.Sub(preemptee.Resreq)
				drf.updateShare(rattr)
				drf.UpdateHierarchicalShare(root, totalAllocated, rjob, rattr, rqueue.Hierarchy, rqueue.Weights)

				// compare hdrf of queues
				ret := drf.compareQueues(root, lqueue, rqueue)

				// resume hdrf of reclaimee job
				totalAllocated.Add(preemptee.Resreq)
				rattr.allocated.Add(preemptee.Resreq)
				drf.updateShare(rattr)
				drf.UpdateHierarchicalShare(root, totalAllocated, rjob, rattr, rqueue.Hierarchy, rqueue.Weights)

				if ret < 0 {
					victims = append(victims, preemptee)
				}

				if ret > shareDelta {
					continue
				}
			}

			klog.V(4).Infof("Victims from HDRF plugins are %+v", victims)

			return victims, util.Permit
		}
		ssn.AddReclaimableFn(drf.Name(), reclaimFn)
	}

	jobOrderFn := func(l interface{}, r interface{}) int {
		lv := l.(*api.JobInfo)
		rv := r.(*api.JobInfo)

		klog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %v, <%v/%v> share state: %v",
			lv.Namespace, lv.Name, drf.jobAttrs[lv.UID].share, rv.Namespace, rv.Name, drf.jobAttrs[rv.UID].share)

		if drf.jobAttrs[lv.UID].share == drf.jobAttrs[rv.UID].share {
			return 0
		}

		if drf.jobAttrs[lv.UID].share < drf.jobAttrs[rv.UID].share {
			return -1
		}

		return 1
	}

	ssn.AddJobOrderFn(drf.Name(), jobOrderFn)

	namespaceOrderFn := func(l interface{}, r interface{}) int {
		lv := l.(api.NamespaceName)
		rv := r.(api.NamespaceName)

		lOpt := drf.namespaceOpts[string(lv)]
		rOpt := drf.namespaceOpts[string(rv)]

		lWeight := ssn.NamespaceInfo[lv].GetWeight()
		rWeight := ssn.NamespaceInfo[rv].GetWeight()

		klog.V(4).Infof("DRF NamespaceOrderFn: <%v> share state: %f, weight %v, <%v> share state: %f, weight %v",
			lv, lOpt.share, lWeight, rv, rOpt.share, rWeight)

		lWeightedShare := lOpt.share / float64(lWeight)
		rWeightedShare := rOpt.share / float64(rWeight)

		metrics.UpdateNamespaceWeight(string(lv), lWeight)
		metrics.UpdateNamespaceWeight(string(rv), rWeight)
		metrics.UpdateNamespaceWeightedShare(string(lv), lWeightedShare)
		metrics.UpdateNamespaceWeightedShare(string(rv), rWeightedShare)

		if lWeightedShare == rWeightedShare {
			return 0
		}

		if lWeightedShare < rWeightedShare {
			return -1
		}

		return 1
	}

	if namespaceOrderEnabled {
		ssn.AddNamespaceOrderFn(drf.Name(), namespaceOrderFn)
	}

	// Register event handlers.
	ssn.AddEventHandler(&framework.EventHandler{
		AllocateFunc: func(event *framework.Event) {
			attr := drf.jobAttrs[event.Task.Job]
			attr.allocated.Add(event.Task.Resreq)

			job := ssn.Jobs[event.Task.Job]
			drf.updateJobShare(job.Namespace, job.Name, attr)

			nsShare := -1.0
			if namespaceOrderEnabled {
				nsOpt := drf.namespaceOpts[event.Task.Namespace]
				nsOpt.allocated.Add(event.Task.Resreq)

				drf.updateNamespaceShare(event.Task.Namespace, nsOpt)
				nsShare = nsOpt.share
			}
			if hierarchyEnabled {
				queue := ssn.Queues[job.Queue]

				drf.totalAllocated.Add(event.Task.Resreq)
				drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights)
			}

			klog.V(4).Infof("DRF AllocateFunc: task <%v/%v>, resreq <%v>,  share <%v>, namespace share <%v>",
				event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare)
		},
		DeallocateFunc: func(event *framework.Event) {
			attr := drf.jobAttrs[event.Task.Job]
			attr.allocated.Sub(event.Task.Resreq)

			job := ssn.Jobs[event.Task.Job]
			drf.updateJobShare(job.Namespace, job.Name, attr)

			nsShare := -1.0
			if namespaceOrderEnabled {
				nsOpt := drf.namespaceOpts[event.Task.Namespace]
				nsOpt.allocated.Sub(event.Task.Resreq)

				drf.updateNamespaceShare(event.Task.Namespace, nsOpt)
				nsShare = nsOpt.share
			}

			if hierarchyEnabled {
				queue := ssn.Queues[job.Queue]
				drf.totalAllocated.Sub(event.Task.Resreq)
				drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights)
			}

			klog.V(4).Infof("DRF EvictFunc: task <%v/%v>, resreq <%v>,  share <%v>, namespace share <%v>",
				event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare)
		},
	})
}