pkg/scheduler/plugins/task-topology/manager.go (280 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package tasktopology import ( "fmt" "math" "sort" "strings" "k8s.io/apimachinery/pkg/types" "k8s.io/klog" "volcano.sh/volcano/pkg/scheduler/api" ) type topologyType int const ( selfAntiAffinity topologyType = iota interAntiAffinity selfAffinity interAffinity ) // map[topologyType]priority, the larger number means the higher priority var affinityPriority = map[topologyType]int{ selfAntiAffinity: 4, interAffinity: 3, selfAffinity: 2, interAntiAffinity: 1, } // JobManager is struct used to save infos about affinity and buckets of a job type JobManager struct { jobID api.JobID buckets []*Bucket podInBucket map[types.UID]int podInTask map[types.UID]string taskOverPod map[string]map[types.UID]struct{} taskAffinityPriority map[string]int // [taskName] -> priority taskExistOrder map[string]int interAffinity map[string]map[string]struct{} // [taskName]->[taskName] selfAffinity map[string]struct{} interAntiAffinity map[string]map[string]struct{} // [taskName]->[taskName] selfAntiAffinity map[string]struct{} bucketMaxSize int nodeTaskSet map[string]map[string]int // [nodeName]->[taskName] } // NewJobManager creates a new job manager for job func NewJobManager(jobID api.JobID) *JobManager { return &JobManager{ jobID: jobID, buckets: make([]*Bucket, 0), podInBucket: make(map[types.UID]int), podInTask: make(map[types.UID]string), taskOverPod: make(map[string]map[types.UID]struct{}), taskAffinityPriority: make(map[string]int), taskExistOrder: make(map[string]int), interAffinity: make(map[string]map[string]struct{}), interAntiAffinity: make(map[string]map[string]struct{}), selfAffinity: make(map[string]struct{}), selfAntiAffinity: make(map[string]struct{}), bucketMaxSize: 0, nodeTaskSet: make(map[string]map[string]int), } } // MarkOutOfBucket indicates task is outside of any bucket func (jm *JobManager) MarkOutOfBucket(uid types.UID) { jm.podInBucket[uid] = OutOfBucket } // MarkTaskHasTopology indicates task has topology settings func (jm *JobManager) MarkTaskHasTopology(taskName string, topoType topologyType) { priority := affinityPriority[topoType] if priority > jm.taskAffinityPriority[taskName] { jm.taskAffinityPriority[taskName] = priority } } // ApplyTaskTopology transforms taskTopology to matrix // affinity: [[a, b], [c]] // interAffinity: // a b c // a - x - // b x - - // c - - - // selfAffinity: // a b c // - - x func (jm *JobManager) ApplyTaskTopology(topo *TaskTopology) { for _, aff := range topo.Affinity { if len(aff) == 1 { taskName := aff[0] jm.selfAffinity[taskName] = struct{}{} jm.MarkTaskHasTopology(taskName, selfAffinity) continue } for index, src := range aff { for _, dst := range aff[:index] { addAffinity(jm.interAffinity, src, dst) addAffinity(jm.interAffinity, dst, src) } jm.MarkTaskHasTopology(src, interAffinity) } } for _, aff := range topo.AntiAffinity { if len(aff) == 1 { taskName := aff[0] jm.selfAntiAffinity[taskName] = struct{}{} jm.MarkTaskHasTopology(taskName, selfAntiAffinity) continue } for index, src := range aff { for _, dst := range aff[:index] { addAffinity(jm.interAntiAffinity, src, dst) addAffinity(jm.interAntiAffinity, dst, src) } jm.MarkTaskHasTopology(src, interAntiAffinity) } } length := len(topo.TaskOrder) for index, taskName := range topo.TaskOrder { jm.taskExistOrder[taskName] = length - index } } // NewBucket creates a new bucket func (jm *JobManager) NewBucket() *Bucket { bucket := NewBucket() bucket.index = len(jm.buckets) jm.buckets = append(jm.buckets, bucket) return bucket } // AddTaskToBucket adds task into bucket func (jm *JobManager) AddTaskToBucket(bucketIndex int, taskName string, task *api.TaskInfo) { bucket := jm.buckets[bucketIndex] jm.podInBucket[task.Pod.UID] = bucketIndex bucket.AddTask(taskName, task) if size := len(bucket.tasks) + bucket.boundTask; size > jm.bucketMaxSize { jm.bucketMaxSize = size } } // L compared with R, -1 for L < R, 0 for L == R, 1 for L > R func (jm *JobManager) taskAffinityOrder(L, R *api.TaskInfo) int { LTaskName := jm.podInTask[L.Pod.UID] RTaskName := jm.podInTask[R.Pod.UID] // in the same vk task, they are equal if LTaskName == RTaskName { return 0 } // use user defined order firstly LOrder := jm.taskExistOrder[LTaskName] ROrder := jm.taskExistOrder[RTaskName] if LOrder != ROrder { if LOrder > ROrder { return 1 } return -1 } LPriority := jm.taskAffinityPriority[LTaskName] RPriority := jm.taskAffinityPriority[RTaskName] if LPriority != RPriority { if LPriority > RPriority { return 1 } return -1 } // all affinity setting of L and R are the same, they are equal return 0 } func (jm *JobManager) buildTaskInfo(tasks map[api.TaskID]*api.TaskInfo) []*api.TaskInfo { taskWithoutBucket := make([]*api.TaskInfo, 0, len(tasks)) for _, task := range tasks { pod := task.Pod taskName := getTaskName(task) if taskName == "" { jm.MarkOutOfBucket(pod.UID) continue } if _, hasTopology := jm.taskAffinityPriority[taskName]; !hasTopology { jm.MarkOutOfBucket(pod.UID) continue } jm.podInTask[pod.UID] = taskName taskSet, ok := jm.taskOverPod[taskName] if !ok { taskSet = make(map[types.UID]struct{}) jm.taskOverPod[taskName] = taskSet } taskSet[pod.UID] = struct{}{} taskWithoutBucket = append(taskWithoutBucket, task) } return taskWithoutBucket } func (jm *JobManager) checkTaskSetAffinity(taskName string, taskNameSet map[string]int, onlyAnti bool) int { bucketPodAff := 0 if taskName == "" { return bucketPodAff } for taskNameInBucket, count := range taskNameSet { theSameTask := taskNameInBucket == taskName if !onlyAnti { affinity := false if theSameTask { _, affinity = jm.selfAffinity[taskName] } else { _, affinity = jm.interAffinity[taskName][taskNameInBucket] } if affinity { bucketPodAff += count } } antiAffinity := false if theSameTask { _, antiAffinity = jm.selfAntiAffinity[taskName] } else { _, antiAffinity = jm.interAntiAffinity[taskName][taskNameInBucket] } if antiAffinity { bucketPodAff -= count } } return bucketPodAff } func (jm *JobManager) buildBucket(taskWithOrder []*api.TaskInfo) { nodeBucketMapping := make(map[string]*Bucket) for _, task := range taskWithOrder { klog.V(5).Infof("jobID %s task with order task %s/%s", jm.jobID, task.Namespace, task.Name) var selectedBucket *Bucket maxAffinity := math.MinInt32 taskName := getTaskName(task) if task.NodeName != "" { // generate bucket by node maxAffinity = 0 selectedBucket = nodeBucketMapping[task.NodeName] } else { for _, bucket := range jm.buckets { bucketPodAff := jm.checkTaskSetAffinity(taskName, bucket.taskNameSet, false) // choose the best fit affinity, or balance resource between bucket if bucketPodAff > maxAffinity { maxAffinity = bucketPodAff selectedBucket = bucket } else if bucketPodAff == maxAffinity && selectedBucket != nil && bucket.reqScore < selectedBucket.reqScore { selectedBucket = bucket } } } if maxAffinity < 0 || selectedBucket == nil { selectedBucket = jm.NewBucket() if task.NodeName != "" { nodeBucketMapping[task.NodeName] = selectedBucket } } jm.AddTaskToBucket(selectedBucket.index, taskName, task) } } // ConstructBucket builds bucket for tasks func (jm *JobManager) ConstructBucket(tasks map[api.TaskID]*api.TaskInfo) { taskWithoutBucket := jm.buildTaskInfo(tasks) o := TaskOrder{ tasks: taskWithoutBucket, manager: jm, } sort.Sort(sort.Reverse(&o)) jm.buildBucket(o.tasks) } // TaskBound binds task to bucket func (jm *JobManager) TaskBound(task *api.TaskInfo) { if taskName := getTaskName(task); taskName != "" { set, ok := jm.nodeTaskSet[task.NodeName] if !ok { set = make(map[string]int) jm.nodeTaskSet[task.NodeName] = set } set[taskName]++ } bucket := jm.GetBucket(task) if bucket != nil { bucket.TaskBound(task) } } // GetBucket get bucket inside which task has been func (jm *JobManager) GetBucket(task *api.TaskInfo) *Bucket { index, ok := jm.podInBucket[task.Pod.UID] if !ok || index == OutOfBucket { return nil } bucket := jm.buckets[index] return bucket } func (jm *JobManager) String() string { // saa: selfAntiAffinity // iaa: interAntiAffinity // sa: selfAffinity // ia: interAffinity msg := []string{ fmt.Sprintf("%s - job %s max %d || saa: %v - iaa: %v - sa: %v - ia: %v || priority: %v - order: %v || ", PluginName, jm.jobID, jm.bucketMaxSize, jm.selfAntiAffinity, jm.interAntiAffinity, jm.selfAffinity, jm.interAffinity, jm.taskAffinityPriority, jm.taskExistOrder, ), } for _, bucket := range jm.buckets { bucketMsg := fmt.Sprintf("b:%d -- ", bucket.index) var info []string for _, task := range bucket.tasks { info = append(info, task.Pod.Name) } bucketMsg += strings.Join(info, ", ") bucketMsg += "|" info = nil for nodeName, count := range bucket.node { info = append(info, fmt.Sprintf("n%s-%d", nodeName, count)) } bucketMsg += strings.Join(info, ", ") msg = append(msg, "["+bucketMsg+"]") } return strings.Join(msg, " ") }