in pkg/scheduler/plugins/drf/drf.go [200:513]
func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
// Prepare scheduling data for this session.
drf.totalResource.Add(ssn.TotalResource)
klog.V(4).Infof("Total Allocatable %s", drf.totalResource)
namespaceOrderEnabled := drf.NamespaceOrderEnabled(ssn)
hierarchyEnabled := drf.HierarchyEnabled(ssn)
for _, job := range ssn.Jobs {
attr := &drfAttr{
allocated: api.EmptyResource(),
}
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
attr.allocated.Add(t.Resreq)
}
}
}
// Calculate the init share of Job
drf.updateJobShare(job.Namespace, job.Name, attr)
drf.jobAttrs[job.UID] = attr
if namespaceOrderEnabled {
nsOpts, found := drf.namespaceOpts[job.Namespace]
if !found {
nsOpts = &drfAttr{
allocated: api.EmptyResource(),
}
drf.namespaceOpts[job.Namespace] = nsOpts
}
// all task in job should have the same namespace with job
nsOpts.allocated.Add(attr.allocated)
drf.updateNamespaceShare(job.Namespace, nsOpts)
}
if hierarchyEnabled {
queue := ssn.Queues[job.Queue]
drf.totalAllocated.Add(attr.allocated)
drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights)
}
}
preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
var victims []*api.TaskInfo
addVictim := func(candidate *api.TaskInfo) {
victims = append(victims, candidate)
}
if namespaceOrderEnabled {
// apply the namespace share policy on preemptee firstly
lWeight := ssn.NamespaceInfo[api.NamespaceName(preemptor.Namespace)].GetWeight()
lNsAtt := drf.namespaceOpts[preemptor.Namespace]
lNsAlloc := lNsAtt.allocated.Clone().Add(preemptor.Resreq)
_, lNsShare := drf.calculateShare(lNsAlloc, drf.totalResource)
lNsShareWeighted := lNsShare / float64(lWeight)
namespaceAllocation := map[string]*api.Resource{}
// undecidedPreemptees means this policy could not judge preemptee is preemptable or not
// and left it to next policy
undecidedPreemptees := []*api.TaskInfo{}
for _, preemptee := range preemptees {
if preemptor.Namespace == preemptee.Namespace {
// policy is disabled when they are in the same namespace
undecidedPreemptees = append(undecidedPreemptees, preemptee)
continue
}
// compute the preemptee namespace weighted share after preemption
nsAllocation, found := namespaceAllocation[preemptee.Namespace]
if !found {
rNsAtt := drf.namespaceOpts[preemptee.Namespace]
nsAllocation = rNsAtt.allocated.Clone()
namespaceAllocation[preemptee.Namespace] = nsAllocation
}
rWeight := ssn.NamespaceInfo[api.NamespaceName(preemptee.Namespace)].GetWeight()
rNsAlloc := nsAllocation.Sub(preemptee.Resreq)
_, rNsShare := drf.calculateShare(rNsAlloc, drf.totalResource)
rNsShareWeighted := rNsShare / float64(rWeight)
// to avoid ping pong actions, the preemptee namespace should
// have the higher weighted share after preemption.
if lNsShareWeighted < rNsShareWeighted {
addVictim(preemptee)
continue
}
if lNsShareWeighted-rNsShareWeighted > shareDelta {
continue
}
// equal namespace order leads to judgement of jobOrder
undecidedPreemptees = append(undecidedPreemptees, preemptee)
}
preemptees = undecidedPreemptees
}
latt := drf.jobAttrs[preemptor.Job]
lalloc := latt.allocated.Clone().Add(preemptor.Resreq)
_, ls := drf.calculateShare(lalloc, drf.totalResource)
allocations := map[api.JobID]*api.Resource{}
for _, preemptee := range preemptees {
if _, found := allocations[preemptee.Job]; !found {
ratt := drf.jobAttrs[preemptee.Job]
allocations[preemptee.Job] = ratt.allocated.Clone()
}
ralloc := allocations[preemptee.Job].Sub(preemptee.Resreq)
_, rs := drf.calculateShare(ralloc, drf.totalResource)
if ls < rs || math.Abs(ls-rs) <= shareDelta {
addVictim(preemptee)
}
}
klog.V(4).Infof("Victims from DRF plugins are %+v", victims)
return victims, util.Permit
}
ssn.AddPreemptableFn(drf.Name(), preemptableFn)
if hierarchyEnabled {
queueOrderFn := func(l interface{}, r interface{}) int {
lv := l.(*api.QueueInfo)
rv := r.(*api.QueueInfo)
ret := drf.compareQueues(drf.hierarchicalRoot, lv, rv)
if ret < 0 {
return -1
}
if ret > 0 {
return 1
}
return 0
}
ssn.AddQueueOrderFn(drf.Name(), queueOrderFn)
reclaimFn := func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) ([]*api.TaskInfo, int) {
var victims []*api.TaskInfo
// clone hdrf tree
totalAllocated := drf.totalAllocated.Clone()
root := drf.hierarchicalRoot.Clone(nil)
// update reclaimer hdrf
ljob := ssn.Jobs[reclaimer.Job]
lqueue := ssn.Queues[ljob.Queue]
ljob = ljob.Clone()
attr := drf.jobAttrs[ljob.UID]
lattr := &drfAttr{
allocated: attr.allocated.Clone(),
}
lattr.allocated.Add(reclaimer.Resreq)
totalAllocated.Add(reclaimer.Resreq)
drf.updateShare(lattr)
drf.UpdateHierarchicalShare(root, totalAllocated, ljob, lattr, lqueue.Hierarchy, lqueue.Weights)
for _, preemptee := range reclaimees {
rjob := ssn.Jobs[preemptee.Job]
rqueue := ssn.Queues[rjob.Queue]
// update hdrf of reclaimee job
totalAllocated.Sub(preemptee.Resreq)
rjob = rjob.Clone()
attr := drf.jobAttrs[rjob.UID]
rattr := &drfAttr{
allocated: attr.allocated.Clone(),
}
rattr.allocated.Sub(preemptee.Resreq)
drf.updateShare(rattr)
drf.UpdateHierarchicalShare(root, totalAllocated, rjob, rattr, rqueue.Hierarchy, rqueue.Weights)
// compare hdrf of queues
ret := drf.compareQueues(root, lqueue, rqueue)
// resume hdrf of reclaimee job
totalAllocated.Add(preemptee.Resreq)
rattr.allocated.Add(preemptee.Resreq)
drf.updateShare(rattr)
drf.UpdateHierarchicalShare(root, totalAllocated, rjob, rattr, rqueue.Hierarchy, rqueue.Weights)
if ret < 0 {
victims = append(victims, preemptee)
}
if ret > shareDelta {
continue
}
}
klog.V(4).Infof("Victims from HDRF plugins are %+v", victims)
return victims, util.Permit
}
ssn.AddReclaimableFn(drf.Name(), reclaimFn)
}
jobOrderFn := func(l interface{}, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
klog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %v, <%v/%v> share state: %v",
lv.Namespace, lv.Name, drf.jobAttrs[lv.UID].share, rv.Namespace, rv.Name, drf.jobAttrs[rv.UID].share)
if drf.jobAttrs[lv.UID].share == drf.jobAttrs[rv.UID].share {
return 0
}
if drf.jobAttrs[lv.UID].share < drf.jobAttrs[rv.UID].share {
return -1
}
return 1
}
ssn.AddJobOrderFn(drf.Name(), jobOrderFn)
namespaceOrderFn := func(l interface{}, r interface{}) int {
lv := l.(api.NamespaceName)
rv := r.(api.NamespaceName)
lOpt := drf.namespaceOpts[string(lv)]
rOpt := drf.namespaceOpts[string(rv)]
lWeight := ssn.NamespaceInfo[lv].GetWeight()
rWeight := ssn.NamespaceInfo[rv].GetWeight()
klog.V(4).Infof("DRF NamespaceOrderFn: <%v> share state: %f, weight %v, <%v> share state: %f, weight %v",
lv, lOpt.share, lWeight, rv, rOpt.share, rWeight)
lWeightedShare := lOpt.share / float64(lWeight)
rWeightedShare := rOpt.share / float64(rWeight)
metrics.UpdateNamespaceWeight(string(lv), lWeight)
metrics.UpdateNamespaceWeight(string(rv), rWeight)
metrics.UpdateNamespaceWeightedShare(string(lv), lWeightedShare)
metrics.UpdateNamespaceWeightedShare(string(rv), rWeightedShare)
if lWeightedShare == rWeightedShare {
return 0
}
if lWeightedShare < rWeightedShare {
return -1
}
return 1
}
if namespaceOrderEnabled {
ssn.AddNamespaceOrderFn(drf.Name(), namespaceOrderFn)
}
// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
attr := drf.jobAttrs[event.Task.Job]
attr.allocated.Add(event.Task.Resreq)
job := ssn.Jobs[event.Task.Job]
drf.updateJobShare(job.Namespace, job.Name, attr)
nsShare := -1.0
if namespaceOrderEnabled {
nsOpt := drf.namespaceOpts[event.Task.Namespace]
nsOpt.allocated.Add(event.Task.Resreq)
drf.updateNamespaceShare(event.Task.Namespace, nsOpt)
nsShare = nsOpt.share
}
if hierarchyEnabled {
queue := ssn.Queues[job.Queue]
drf.totalAllocated.Add(event.Task.Resreq)
drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights)
}
klog.V(4).Infof("DRF AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>, namespace share <%v>",
event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare)
},
DeallocateFunc: func(event *framework.Event) {
attr := drf.jobAttrs[event.Task.Job]
attr.allocated.Sub(event.Task.Resreq)
job := ssn.Jobs[event.Task.Job]
drf.updateJobShare(job.Namespace, job.Name, attr)
nsShare := -1.0
if namespaceOrderEnabled {
nsOpt := drf.namespaceOpts[event.Task.Namespace]
nsOpt.allocated.Sub(event.Task.Resreq)
drf.updateNamespaceShare(event.Task.Namespace, nsOpt)
nsShare = nsOpt.share
}
if hierarchyEnabled {
queue := ssn.Queues[job.Queue]
drf.totalAllocated.Sub(event.Task.Resreq)
drf.UpdateHierarchicalShare(drf.hierarchicalRoot, drf.totalAllocated, job, attr, queue.Hierarchy, queue.Weights)
}
klog.V(4).Infof("DRF EvictFunc: task <%v/%v>, resreq <%v>, share <%v>, namespace share <%v>",
event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare)
},
})
}