pkg/scheduler/plugins/task-topology/util.go (64 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package tasktopology import ( "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" ) const ( // PluginName indicates name of volcano scheduler plugin PluginName = "task-topology" // PluginWeight is task-topology plugin weight in nodeOrderFn PluginWeight = "task-topology.weight" // JobAffinityKey is the key to read in task-topology arguments from job annotations JobAffinityKey = "volcano.sh/task-topology" // OutOfBucket indicates task is outside of any bucket OutOfBucket = -1 // JobAffinityAnnotations is the key to read in task-topology affinity arguments from podgroup annotations JobAffinityAnnotations = "volcano.sh/task-topology-affinity" // JobAntiAffinityAnnotations is the key to read in task-topology anti-affinity arguments from podgroup annotations JobAntiAffinityAnnotations = "volcano.sh/task-topology-anti-affinity" // TaskOrderAnnotations is the key to read in task-topology task order arguments from podgroup annotations TaskOrderAnnotations = "volcano.sh/task-topology-task-order" ) // TaskTopology is struct used to save affinity infos of a job read from job plugin or annotations type TaskTopology struct { Affinity [][]string `json:"affinity,omitempty"` AntiAffinity [][]string `json:"antiAffinity,omitempty"` TaskOrder []string `json:"taskOrder,omitempty"` } func calculateWeight(args framework.Arguments) int { /* User Should give taskTopologyWeight in this format(task-topology.weight). actions: "enqueue, reclaim, allocate, backfill, preempt" tiers: - plugins: - name: task-topology arguments: task-topology.weight: 10 */ // Values are initialized to 1. weight := 1 args.GetInt(&weight, PluginWeight) return weight } func getTaskName(task *api.TaskInfo) string { return task.Pod.Annotations[v1alpha1.TaskSpecKey] } func addAffinity(m map[string]map[string]struct{}, src, dst string) { srcMap, ok := m[src] if !ok { srcMap = make(map[string]struct{}) m[src] = srcMap } srcMap[dst] = struct{}{} } func noPendingTasks(job *api.JobInfo) bool { return len(job.TaskStatusIndex[api.Pending]) == 0 } // TaskOrder is struct used to save task order type TaskOrder struct { tasks []*api.TaskInfo manager *JobManager } func (p *TaskOrder) Len() int { return len(p.tasks) } func (p *TaskOrder) Swap(l, r int) { p.tasks[l], p.tasks[r] = p.tasks[r], p.tasks[l] } func (p *TaskOrder) Less(l, r int) bool { L := p.tasks[l] R := p.tasks[r] LHasNode := L.NodeName != "" RHasNode := R.NodeName != "" if LHasNode || RHasNode { // the task bounded would have high priority if LHasNode != RHasNode { return !LHasNode } // all bound, any order is alright return L.NodeName > R.NodeName } result := p.manager.taskAffinityOrder(L, R) // they have the same taskAffinity order, any order is alright if result == 0 { return L.Name > R.Name } return result < 0 }