func()

in pkg/scheduler/plugins/task-topology/topology.go [135:188]


func (p *taskTopologyPlugin) calcBucketScore(task *api.TaskInfo, node *api.NodeInfo) (int, *JobManager, error) {
	// task could never fits the node
	maxResource := node.Idle.Clone().Add(node.Releasing)
	if req := task.Resreq; req != nil && maxResource.LessPartly(req, api.Zero) {
		return 0, nil, nil
	}

	jobManager, hasManager := p.managers[task.Job]
	if !hasManager {
		return 0, nil, nil
	}

	bucket := jobManager.GetBucket(task)
	// task out of bucket
	if bucket == nil {
		return 0, jobManager, nil
	}

	// 1. bound task in bucket is the base score of this node
	score := bucket.node[node.Name]

	// 2. task inter/self anti-affinity should be calculated
	if nodeTaskSet := jobManager.nodeTaskSet[node.Name]; nodeTaskSet != nil {
		taskName := getTaskName(task)
		affinityScore := jobManager.checkTaskSetAffinity(taskName, nodeTaskSet, true)
		if affinityScore < 0 {
			score += affinityScore
		}
	}
	klog.V(4).Infof("task %s/%s, node %s, additional score %d, task %d",
		task.Namespace, task.Name, node.Name, score, len(bucket.tasks))

	// 3. the other tasks in bucket take into considering
	score += len(bucket.tasks)
	if bucket.request == nil || bucket.request.LessEqual(maxResource, api.Zero) {
		return score, jobManager, nil
	}

	remains := bucket.request.Clone()
	// randomly (by map) take out task to make the bucket fits the node
	for bucketTaskID, bucketTask := range bucket.tasks {
		// current task should kept in bucket
		if bucketTaskID == task.Pod.UID || bucketTask.Resreq == nil {
			continue
		}
		remains.Sub(bucketTask.Resreq)
		score--
		if remains.LessEqual(maxResource, api.Zero) {
			break
		}
	}
	// here, the bucket remained request will always fit the maxResource
	return score, jobManager, nil
}