in pkg/scheduler/plugins/task-topology/topology.go [135:188]
func (p *taskTopologyPlugin) calcBucketScore(task *api.TaskInfo, node *api.NodeInfo) (int, *JobManager, error) {
// task could never fits the node
maxResource := node.Idle.Clone().Add(node.Releasing)
if req := task.Resreq; req != nil && maxResource.LessPartly(req, api.Zero) {
return 0, nil, nil
}
jobManager, hasManager := p.managers[task.Job]
if !hasManager {
return 0, nil, nil
}
bucket := jobManager.GetBucket(task)
// task out of bucket
if bucket == nil {
return 0, jobManager, nil
}
// 1. bound task in bucket is the base score of this node
score := bucket.node[node.Name]
// 2. task inter/self anti-affinity should be calculated
if nodeTaskSet := jobManager.nodeTaskSet[node.Name]; nodeTaskSet != nil {
taskName := getTaskName(task)
affinityScore := jobManager.checkTaskSetAffinity(taskName, nodeTaskSet, true)
if affinityScore < 0 {
score += affinityScore
}
}
klog.V(4).Infof("task %s/%s, node %s, additional score %d, task %d",
task.Namespace, task.Name, node.Name, score, len(bucket.tasks))
// 3. the other tasks in bucket take into considering
score += len(bucket.tasks)
if bucket.request == nil || bucket.request.LessEqual(maxResource, api.Zero) {
return score, jobManager, nil
}
remains := bucket.request.Clone()
// randomly (by map) take out task to make the bucket fits the node
for bucketTaskID, bucketTask := range bucket.tasks {
// current task should kept in bucket
if bucketTaskID == task.Pod.UID || bucketTask.Resreq == nil {
continue
}
remains.Sub(bucketTask.Resreq)
score--
if remains.LessEqual(maxResource, api.Zero) {
break
}
}
// here, the bucket remained request will always fit the maxResource
return score, jobManager, nil
}