in pkg/scheduler/plugins/proportion/proportion.go [76:363]
func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
// Prepare scheduling data for this session.
pp.totalResource.Add(ssn.TotalResource)
klog.V(4).Infof("The total resource is <%v>", pp.totalResource)
for _, queue := range ssn.Queues {
if len(queue.Queue.Spec.Guarantee.Resource) == 0 {
continue
}
guarantee := api.NewResource(queue.Queue.Spec.Guarantee.Resource)
pp.totalGuarantee.Add(guarantee)
}
klog.V(4).Infof("The total guarantee resource is <%v>", pp.totalGuarantee)
// Build attributes for Queues.
for _, job := range ssn.Jobs {
klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name)
if _, found := pp.queueOpts[job.Queue]; !found {
queue := ssn.Queues[job.Queue]
attr := &queueAttr{
queueID: queue.UID,
name: queue.Name,
weight: queue.Weight,
deserved: api.EmptyResource(),
allocated: api.EmptyResource(),
request: api.EmptyResource(),
inqueue: api.EmptyResource(),
guarantee: api.EmptyResource(),
}
if len(queue.Queue.Spec.Capability) != 0 {
attr.capability = api.NewResource(queue.Queue.Spec.Capability)
if attr.capability.MilliCPU <= 0 {
attr.capability.MilliCPU = math.MaxFloat64
}
if attr.capability.Memory <= 0 {
attr.capability.Memory = math.MaxFloat64
}
}
if len(queue.Queue.Spec.Guarantee.Resource) != 0 {
attr.guarantee = api.NewResource(queue.Queue.Spec.Guarantee.Resource)
}
realCapability := pp.totalResource.Clone().Sub(pp.totalGuarantee).Add(attr.guarantee)
if attr.capability == nil {
attr.realCapability = realCapability
} else {
attr.realCapability = helpers.Min(realCapability, attr.capability)
}
pp.queueOpts[job.Queue] = attr
klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue)
}
attr := pp.queueOpts[job.Queue]
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
attr.allocated.Add(t.Resreq)
attr.request.Add(t.Resreq)
}
} else if status == api.Pending {
for _, t := range tasks {
attr.request.Add(t.Resreq)
}
}
}
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(job.GetMinResources())
}
// calculate inqueue resource for running jobs
// the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition:
// Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement.
if job.PodGroup.Status.Phase == scheduling.PodGroupRunning &&
job.PodGroup.Spec.MinResources != nil &&
job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember {
allocated := util.GetAllocatedResource(job)
inqueued := util.GetInqueueResource(job, allocated)
attr.inqueue.Add(inqueued)
}
}
// Record metrics
for _, attr := range pp.queueOpts {
metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory)
metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory)
metrics.UpdateQueueWeight(attr.name, attr.weight)
queue := ssn.Queues[attr.queueID]
metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue)
metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending)
metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running)
metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown)
}
remaining := pp.totalResource.Clone()
meet := map[api.QueueID]struct{}{}
for {
totalWeight := int32(0)
for _, attr := range pp.queueOpts {
if _, found := meet[attr.queueID]; found {
continue
}
totalWeight += attr.weight
}
// If no queues, break
if totalWeight == 0 {
klog.V(4).Infof("Exiting when total weight is 0")
break
}
oldRemaining := remaining.Clone()
// Calculates the deserved of each Queue.
// increasedDeserved is the increased value for attr.deserved of processed queues
// decreasedDeserved is the decreased value for attr.deserved of processed queues
increasedDeserved := api.EmptyResource()
decreasedDeserved := api.EmptyResource()
for _, attr := range pp.queueOpts {
klog.V(4).Infof("Considering Queue <%s>: weight <%d>, total weight <%d>.",
attr.name, attr.weight, totalWeight)
if _, found := meet[attr.queueID]; found {
continue
}
oldDeserved := attr.deserved.Clone()
attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight)))
if attr.realCapability != nil {
attr.deserved.MinDimensionResource(attr.realCapability, api.Infinity)
}
attr.deserved.MinDimensionResource(attr.request, api.Zero)
klog.V(4).Infof("Format queue <%s> deserved resource to <%v>", attr.name, attr.deserved)
if attr.request.LessEqual(attr.deserved, api.Zero) {
meet[attr.queueID] = struct{}{}
klog.V(4).Infof("queue <%s> is meet", attr.name)
} else if reflect.DeepEqual(attr.deserved, oldDeserved) {
meet[attr.queueID] = struct{}{}
klog.V(4).Infof("queue <%s> is meet cause of the capability", attr.name)
}
attr.deserved = helpers.Max(attr.deserved, attr.guarantee)
pp.updateShare(attr)
klog.V(4).Infof("The attributes of queue <%s> in proportion: deserved <%v>, realCapability <%v>, allocate <%v>, request <%v>, share <%0.2f>",
attr.name, attr.deserved, attr.realCapability, attr.allocated, attr.request, attr.share)
increased, decreased := attr.deserved.Diff(oldDeserved, api.Zero)
increasedDeserved.Add(increased)
decreasedDeserved.Add(decreased)
// Record metrics
metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory)
}
remaining.Sub(increasedDeserved).Add(decreasedDeserved)
klog.V(4).Infof("Remaining resource is <%s>", remaining)
if remaining.IsEmpty() || reflect.DeepEqual(remaining, oldRemaining) {
klog.V(4).Infof("Exiting when remaining is empty or no queue has more reosurce request: <%v>", remaining)
break
}
}
ssn.AddQueueOrderFn(pp.Name(), func(l, r interface{}) int {
lv := l.(*api.QueueInfo)
rv := r.(*api.QueueInfo)
if pp.queueOpts[lv.UID].share == pp.queueOpts[rv.UID].share {
return 0
}
if pp.queueOpts[lv.UID].share < pp.queueOpts[rv.UID].share {
return -1
}
return 1
})
ssn.AddReclaimableFn(pp.Name(), func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) ([]*api.TaskInfo, int) {
var victims []*api.TaskInfo
allocations := map[api.QueueID]*api.Resource{}
for _, reclaimee := range reclaimees {
job := ssn.Jobs[reclaimee.Job]
attr := pp.queueOpts[job.Queue]
if _, found := allocations[job.Queue]; !found {
allocations[job.Queue] = attr.allocated.Clone()
}
allocated := allocations[job.Queue]
if allocated.LessPartly(reclaimer.Resreq, api.Zero) {
klog.V(3).Infof("Failed to allocate resource for Task <%s/%s> in Queue <%s>, not enough resource.",
reclaimee.Namespace, reclaimee.Name, job.Queue)
continue
}
if !allocated.LessEqual(attr.deserved, api.Zero) {
allocated.Sub(reclaimee.Resreq)
victims = append(victims, reclaimee)
}
}
klog.V(4).Infof("Victims from proportion plugins are %+v", victims)
return victims, util.Permit
})
ssn.AddOverusedFn(pp.Name(), func(obj interface{}) bool {
queue := obj.(*api.QueueInfo)
attr := pp.queueOpts[queue.UID]
overused := attr.deserved.LessEqual(attr.allocated, api.Zero)
metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>, share <%v>",
queue.Name, attr.deserved, attr.allocated, attr.share)
}
return overused
})
ssn.AddAllocatableFn(pp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
attr := pp.queueOpts[queue.UID]
free, _ := attr.deserved.Diff(attr.allocated, api.Zero)
allocatable := candidate.Resreq.LessEqual(free, api.Zero)
if !allocatable {
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>; Candidate <%v>: resource request <%v>",
queue.Name, attr.deserved, attr.allocated, candidate.Name, candidate.Resreq)
}
return allocatable
})
ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) int {
job := obj.(*api.JobInfo)
queueID := job.Queue
attr := pp.queueOpts[queueID]
queue := ssn.Queues[queueID]
// If no capability is set, always enqueue the job.
if attr.realCapability == nil {
klog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.",
queue.Name, job.Namespace, job.Name)
return util.Permit
}
if job.PodGroup.Spec.MinResources == nil {
klog.V(4).Infof("job %s MinResources is null.", job.Name)
return util.Permit
}
minReq := job.GetMinResources()
klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s>",
job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String())
// The queue resource quota limit has not reached
inqueue := minReq.Add(attr.allocated).Add(attr.inqueue).LessEqual(attr.realCapability, api.Infinity)
klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue)
if inqueue {
attr.inqueue.Add(job.GetMinResources())
return util.Permit
}
ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient")
return util.Reject
})
// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
job := ssn.Jobs[event.Task.Job]
attr := pp.queueOpts[job.Queue]
attr.allocated.Add(event.Task.Resreq)
metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory)
pp.updateShare(attr)
klog.V(4).Infof("Proportion AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>",
event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share)
},
DeallocateFunc: func(event *framework.Event) {
job := ssn.Jobs[event.Task.Job]
attr := pp.queueOpts[job.Queue]
attr.allocated.Sub(event.Task.Resreq)
metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory)
pp.updateShare(attr)
klog.V(4).Infof("Proportion EvictFunc: task <%v/%v>, resreq <%v>, share <%v>",
event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share)
},
})
}