pkg/scheduler/plugins/task-topology/topology.go (264 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package tasktopology import ( "fmt" "strings" "time" "k8s.io/klog" k8sFramework "k8s.io/kubernetes/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" ) type taskTopologyPlugin struct { arguments framework.Arguments weight int managers map[api.JobID]*JobManager } // New function returns taskTopologyPlugin object func New(arguments framework.Arguments) framework.Plugin { return &taskTopologyPlugin{ arguments: arguments, weight: calculateWeight(arguments), managers: make(map[api.JobID]*JobManager), } } func (p *taskTopologyPlugin) Name() string { return PluginName } // TaskOrderFn returns -1 to make l prior to r. // // for example: // A: // | bucket1 | bucket2 | out of bucket // | a1 a3 | a2 | a4 // B: // | bucket1 | out of bucket // | b1 b2 | b3 // the right task order should be: // a1 a3 a2 b1 b2 a4 b3 func (p *taskTopologyPlugin) TaskOrderFn(l interface{}, r interface{}) int { lv, ok := l.(*api.TaskInfo) if !ok { klog.Errorf("Object is not a taskinfo") } rv, ok := r.(*api.TaskInfo) if !ok { klog.Errorf("Object is not a taskinfo") } lvJobManager := p.managers[lv.Job] rvJobManager := p.managers[rv.Job] var lvBucket, rvBucket *Bucket if lvJobManager != nil { lvBucket = lvJobManager.GetBucket(lv) } else { klog.V(4).Infof("No job manager for job <ID: %s>, do not return task order.", lv.Job) return 0 } if rvJobManager != nil { rvBucket = rvJobManager.GetBucket(rv) } else { klog.V(4).Infof("No job manager for job <ID: %s>, do not return task order.", rv.Job) return 0 } // the one have bucket would always prior to another lvInBucket := lvBucket != nil rvInBucket := rvBucket != nil if lvInBucket != rvInBucket { if lvInBucket { return -1 } return 1 } // comparison between job is not the duty of this plugin if lv.Job != rv.Job { return 0 } // task out of bucket have no order if !lvInBucket && !rvInBucket { return 0 } // the big bucket should prior to small one lvHasTask := len(lvBucket.tasks) rvHasTask := len(rvBucket.tasks) if lvHasTask != rvHasTask { if lvHasTask > rvHasTask { return -1 } return 1 } lvBucketIndex := lvBucket.index rvBucketIndex := rvBucket.index // in the same bucket, the affinityOrder is ok if lvBucketIndex == rvBucketIndex { affinityOrder := lvJobManager.taskAffinityOrder(lv, rv) return -affinityOrder } // the old bucket should prior to young one if lvBucketIndex < rvBucketIndex { return -1 } return 1 } func (p *taskTopologyPlugin) calcBucketScore(task *api.TaskInfo, node *api.NodeInfo) (int, *JobManager, error) { // task could never fits the node maxResource := node.Idle.Clone().Add(node.Releasing) if req := task.Resreq; req != nil && maxResource.LessPartly(req, api.Zero) { return 0, nil, nil } jobManager, hasManager := p.managers[task.Job] if !hasManager { return 0, nil, nil } bucket := jobManager.GetBucket(task) // task out of bucket if bucket == nil { return 0, jobManager, nil } // 1. bound task in bucket is the base score of this node score := bucket.node[node.Name] // 2. task inter/self anti-affinity should be calculated if nodeTaskSet := jobManager.nodeTaskSet[node.Name]; nodeTaskSet != nil { taskName := getTaskName(task) affinityScore := jobManager.checkTaskSetAffinity(taskName, nodeTaskSet, true) if affinityScore < 0 { score += affinityScore } } klog.V(4).Infof("task %s/%s, node %s, additional score %d, task %d", task.Namespace, task.Name, node.Name, score, len(bucket.tasks)) // 3. the other tasks in bucket take into considering score += len(bucket.tasks) if bucket.request == nil || bucket.request.LessEqual(maxResource, api.Zero) { return score, jobManager, nil } remains := bucket.request.Clone() // randomly (by map) take out task to make the bucket fits the node for bucketTaskID, bucketTask := range bucket.tasks { // current task should kept in bucket if bucketTaskID == task.Pod.UID || bucketTask.Resreq == nil { continue } remains.Sub(bucketTask.Resreq) score-- if remains.LessEqual(maxResource, api.Zero) { break } } // here, the bucket remained request will always fit the maxResource return score, jobManager, nil } func (p *taskTopologyPlugin) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { score, jobManager, err := p.calcBucketScore(task, node) if err != nil { return 0, err } fScore := float64(score * p.weight) if jobManager != nil && jobManager.bucketMaxSize != 0 { fScore = fScore * float64(k8sFramework.MaxNodeScore) / float64(jobManager.bucketMaxSize) } klog.V(4).Infof("task %s/%s at node %s has bucket score %d, score %f", task.Namespace, task.Name, node.Name, score, fScore) return fScore, nil } func (p *taskTopologyPlugin) AllocateFunc(event *framework.Event) { task := event.Task jobManager, hasManager := p.managers[task.Job] if !hasManager { return } jobManager.TaskBound(task) } func (p *taskTopologyPlugin) initBucket(ssn *framework.Session) { for jobID, job := range ssn.Jobs { if noPendingTasks(job) { klog.V(4).Infof("No pending tasks in job <%s/%s> by plugin %s.", job.Namespace, job.Name, PluginName) continue } jobTopology, err := readTopologyFromPgAnnotations(job) if err != nil { klog.V(4).Infof("Failed to read task topology from job <%s/%s> annotations, error: %s.", job.Namespace, job.Name, err.Error()) continue } if jobTopology == nil { continue } manager := NewJobManager(jobID) manager.ApplyTaskTopology(jobTopology) manager.ConstructBucket(job.Tasks) p.managers[job.UID] = manager } } func affinityCheck(job *api.JobInfo, affinity [][]string) error { if job == nil || affinity == nil { return fmt.Errorf("empty input, job: %v, affinity: %v", job, affinity) } var taskNumber = len(job.Tasks) var taskRef = make(map[string]bool, taskNumber) for _, task := range job.Tasks { tmpStrings := strings.Split(task.Name, "-") if _, exist := taskRef[tmpStrings[len(tmpStrings)-2]]; !exist { taskRef[tmpStrings[len(tmpStrings)-2]] = true } } for _, aff := range affinity { affTasks := make(map[string]bool, len(aff)) for _, task := range aff { if len(task) == 0 { continue } if _, exist := taskRef[task]; !exist { return fmt.Errorf("task %s do not exist in job <%s/%s>", task, job.Namespace, job.Name) } if _, exist := affTasks[task]; exist { return fmt.Errorf("task %s is duplicated in job <%s/%s>", task, job.Namespace, job.Name) } affTasks[task] = true } } return nil } func splitAnnotations(job *api.JobInfo, annotation string) ([][]string, error) { affinityStr := strings.Split(annotation, ";") if len(affinityStr) == 0 { return nil, nil } var affinity = make([][]string, len(affinityStr)) for i, str := range affinityStr { affinity[i] = strings.Split(str, ",") } if err := affinityCheck(job, affinity); err != nil { klog.V(4).Infof("Job <%s/%s> affinity key invalid: %s.", job.Namespace, job.Name, err.Error()) return nil, err } return affinity, nil } func readTopologyFromPgAnnotations(job *api.JobInfo) (*TaskTopology, error) { jobAffinityStr, affinityExist := job.PodGroup.Annotations[JobAffinityAnnotations] jobAntiAffinityStr, antiAffinityExist := job.PodGroup.Annotations[JobAntiAffinityAnnotations] taskOrderStr, taskOrderExist := job.PodGroup.Annotations[TaskOrderAnnotations] if !(affinityExist || antiAffinityExist || taskOrderExist) { return nil, nil } var jobTopology = TaskTopology{ Affinity: nil, AntiAffinity: nil, TaskOrder: nil, } if affinityExist { affinities, err := splitAnnotations(job, jobAffinityStr) if err != nil { klog.V(4).Infof("Job <%s/%s> affinity key invalid: %s.", job.Namespace, job.Name, err.Error()) return nil, err } jobTopology.Affinity = affinities } if antiAffinityExist { affinities, err := splitAnnotations(job, jobAntiAffinityStr) if err != nil { klog.V(4).Infof("Job <%s/%s> anti affinity key invalid: %s.", job.Namespace, job.Name, err.Error()) return nil, err } jobTopology.AntiAffinity = affinities } if taskOrderExist { jobTopology.TaskOrder = strings.Split(taskOrderStr, ",") if err := affinityCheck(job, [][]string{jobTopology.TaskOrder}); err != nil { klog.V(4).Infof("Job <%s/%s> task order key invalid: %s.", job.Namespace, job.Name, err.Error()) return nil, err } } return &jobTopology, nil } func (p *taskTopologyPlugin) OnSessionOpen(ssn *framework.Session) { start := time.Now() klog.V(3).Infof("start to init task topology plugin, weight[%d], defined order %v", p.weight, affinityPriority) p.initBucket(ssn) ssn.AddTaskOrderFn(p.Name(), p.TaskOrderFn) ssn.AddNodeOrderFn(p.Name(), p.NodeOrderFn) ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: p.AllocateFunc, }) klog.V(3).Infof("finished to init task topology plugin, using time %v", time.Since(start)) } func (p *taskTopologyPlugin) OnSessionClose(ssn *framework.Session) { p.managers = nil }