pkg/scheduler/framework/session_plugins.go (603 lines of code) (raw):
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/scheduler/api"
)
// AddJobOrderFn add job order function
func (ssn *Session) AddJobOrderFn(name string, cf api.CompareFn) {
ssn.jobOrderFns[name] = cf
}
// AddQueueOrderFn add queue order function
func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) {
ssn.queueOrderFns[name] = qf
}
// AddClusterOrderFn add queue order function
func (ssn *Session) AddClusterOrderFn(name string, qf api.CompareFn) {
ssn.clusterOrderFns[name] = qf
}
// AddTaskOrderFn add task order function
func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) {
ssn.taskOrderFns[name] = cf
}
// AddNamespaceOrderFn add namespace order function
func (ssn *Session) AddNamespaceOrderFn(name string, cf api.CompareFn) {
ssn.namespaceOrderFns[name] = cf
}
// AddPreemptableFn add preemptable function
func (ssn *Session) AddPreemptableFn(name string, cf api.EvictableFn) {
ssn.preemptableFns[name] = cf
}
// AddReclaimableFn add Reclaimable function
func (ssn *Session) AddReclaimableFn(name string, rf api.EvictableFn) {
ssn.reclaimableFns[name] = rf
}
// AddJobReadyFn add JobReady function
func (ssn *Session) AddJobReadyFn(name string, vf api.ValidateFn) {
ssn.jobReadyFns[name] = vf
}
// AddJobPipelinedFn add pipelined function
func (ssn *Session) AddJobPipelinedFn(name string, vf api.VoteFn) {
ssn.jobPipelinedFns[name] = vf
}
// AddPredicateFn add Predicate function
func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) {
ssn.predicateFns[name] = pf
}
// AddBestNodeFn add BestNode function
func (ssn *Session) AddBestNodeFn(name string, pf api.BestNodeFn) {
ssn.bestNodeFns[name] = pf
}
// AddNodeOrderFn add Node order function
func (ssn *Session) AddNodeOrderFn(name string, pf api.NodeOrderFn) {
ssn.nodeOrderFns[name] = pf
}
// AddBatchNodeOrderFn add Batch Node order function
func (ssn *Session) AddBatchNodeOrderFn(name string, pf api.BatchNodeOrderFn) {
ssn.batchNodeOrderFns[name] = pf
}
// AddNodeMapFn add Node map function
func (ssn *Session) AddNodeMapFn(name string, pf api.NodeMapFn) {
ssn.nodeMapFns[name] = pf
}
// AddNodeReduceFn add Node reduce function
func (ssn *Session) AddNodeReduceFn(name string, pf api.NodeReduceFn) {
ssn.nodeReduceFns[name] = pf
}
// AddOverusedFn add overused function
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
ssn.overusedFns[name] = fn
}
// AddAllocatableFn add allocatable function
func (ssn *Session) AddAllocatableFn(name string, fn api.AllocatableFn) {
ssn.allocatableFns[name] = fn
}
// AddJobValidFn add jobvalid function
func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) {
ssn.jobValidFns[name] = fn
}
// AddJobEnqueueableFn add jobenqueueable function
func (ssn *Session) AddJobEnqueueableFn(name string, fn api.VoteFn) {
ssn.jobEnqueueableFns[name] = fn
}
// AddJobEnqueuedFn add jobEnqueued function
func (ssn *Session) AddJobEnqueuedFn(name string, fn api.JobEnqueuedFn) {
ssn.jobEnqueuedFns[name] = fn
}
// AddTargetJobFn add targetjob function
func (ssn *Session) AddTargetJobFn(name string, fn api.TargetJobFn) {
ssn.targetJobFns[name] = fn
}
// AddReservedNodesFn add reservedNodesFn function
func (ssn *Session) AddReservedNodesFn(name string, fn api.ReservedNodesFn) {
ssn.reservedNodesFns[name] = fn
}
// AddVictimTasksFns add victimTasksFns function
func (ssn *Session) AddVictimTasksFns(name string, fn api.VictimTasksFn) {
ssn.victimTasksFns[name] = fn
}
// AddJobStarvingFns add jobStarvingFns function
func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) {
ssn.jobStarvingFns[name] = fn
}
// Reclaimable invoke reclaimable function of the plugins
func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
var init bool
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledReclaimable) {
continue
}
rf, found := ssn.reclaimableFns[plugin.Name]
if !found {
continue
}
candidates, abstain := rf(reclaimer, reclaimees)
if abstain == 0 {
continue
}
if len(candidates) == 0 {
victims = nil
break
}
if !init {
victims = candidates
init = true
} else {
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
}
}
}
// Update victims to intersection
victims = intersection
}
}
// Plugins in this tier made decision if victims is not nil
if victims != nil {
return victims
}
}
return victims
}
// Preemptable invoke preemptable function of the plugins
func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
var init bool
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPreemptable) {
continue
}
pf, found := ssn.preemptableFns[plugin.Name]
if !found {
continue
}
candidates, abstain := pf(preemptor, preemptees)
if abstain == 0 {
continue
}
// intersection will be nil if length is 0, don't need to do any more check
if len(candidates) == 0 {
victims = nil
break
}
if !init {
victims = candidates
init = true
} else {
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
}
}
}
// Update victims to intersection
victims = intersection
}
}
// Plugins in this tier made decision if victims is not nil
if victims != nil {
return victims
}
}
return victims
}
// Overused invoke overused function of the plugins
func (ssn *Session) Overused(queue *api.QueueInfo) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
of, found := ssn.overusedFns[plugin.Name]
if !found {
continue
}
if of(queue) {
return true
}
}
}
return false
}
// Allocatable invoke allocatable function of the plugins
func (ssn *Session) Allocatable(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
af, found := ssn.allocatableFns[plugin.Name]
if !found {
continue
}
if !af(queue, candidate) {
return false
}
}
}
return true
}
// JobReady invoke jobready function of the plugins
func (ssn *Session) JobReady(obj interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledJobReady) {
continue
}
jrf, found := ssn.jobReadyFns[plugin.Name]
if !found {
continue
}
if !jrf(obj) {
return false
}
}
}
return true
}
// JobPipelined invoke pipelined function of the plugins
// Check if job has get enough resource to run
func (ssn *Session) JobPipelined(obj interface{}) bool {
var hasFound bool
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledJobPipelined) {
continue
}
jrf, found := ssn.jobPipelinedFns[plugin.Name]
if !found {
continue
}
res := jrf(obj)
if res < 0 {
return false
}
if res > 0 {
hasFound = true
}
}
// if plugin exists that votes permit, meanwhile other plugin votes abstention,
// permit job to be pipelined, do not check next tier
if hasFound {
return true
}
}
return true
}
// JobStarving invoke jobStarving function of the plugins
// Check if job still need more resource
func (ssn *Session) JobStarving(obj interface{}) bool {
var hasFound bool
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledJobStarving) {
continue
}
jrf, found := ssn.jobStarvingFns[plugin.Name]
if !found {
continue
}
hasFound = true
if !jrf(obj) {
return false
}
}
// this tier registered function
if hasFound {
return true
}
}
return false
}
// JobValid invoke jobvalid function of the plugins
func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
jrf, found := ssn.jobValidFns[plugin.Name]
if !found {
continue
}
if vr := jrf(obj); vr != nil && !vr.Pass {
return vr
}
}
}
return nil
}
// JobEnqueueable invoke jobEnqueueableFns function of the plugins
func (ssn *Session) JobEnqueueable(obj interface{}) bool {
var hasFound bool
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledJobEnqueued) {
continue
}
fn, found := ssn.jobEnqueueableFns[plugin.Name]
if !found {
continue
}
res := fn(obj)
if res < 0 {
return false
}
if res > 0 {
hasFound = true
}
}
// if plugin exists that votes permit, meanwhile other plugin votes abstention,
// permit job to be enqueueable, do not check next tier
if hasFound {
return true
}
}
return true
}
// JobEnqueued invoke jobEnqueuedFns function of the plugins
func (ssn *Session) JobEnqueued(obj interface{}) {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledJobEnqueued) {
continue
}
fn, found := ssn.jobEnqueuedFns[plugin.Name]
if !found {
continue
}
fn(obj)
}
}
}
// TargetJob invoke targetJobFns function of the plugins
func (ssn *Session) TargetJob(jobs []*api.JobInfo) *api.JobInfo {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledTargetJob) {
continue
}
fn, found := ssn.targetJobFns[plugin.Name]
if !found {
continue
}
return fn(jobs)
}
}
return nil
}
// VictimTasks invoke ReservedNodes function of the plugins
func (ssn *Session) VictimTasks() []*api.TaskInfo {
var victims []*api.TaskInfo
var init bool
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledVictim) {
continue
}
pf, found := ssn.victimTasksFns[plugin.Name]
if !found {
continue
}
candidates := pf()
if !init {
victims = candidates
init = true
} else {
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
}
}
}
// Update victims to intersection
victims = intersection
}
}
// Plugins in this tier made decision if victims is not nil
if victims != nil {
return victims
}
}
return victims
}
// ReservedNodes invoke ReservedNodes function of the plugins
func (ssn *Session) ReservedNodes() {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledReservedNodes) {
continue
}
fn, found := ssn.reservedNodesFns[plugin.Name]
if !found {
continue
}
fn()
}
}
}
// JobOrderFn invoke joborder function of the plugins
func (ssn *Session) JobOrderFn(l, r interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledJobOrder) {
continue
}
jof, found := ssn.jobOrderFns[plugin.Name]
if !found {
continue
}
if j := jof(l, r); j != 0 {
return j < 0
}
}
}
// If no job order funcs, order job by CreationTimestamp first, then by UID.
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
if lv.CreationTimestamp.Equal(&rv.CreationTimestamp) {
return lv.UID < rv.UID
}
return lv.CreationTimestamp.Before(&rv.CreationTimestamp)
}
// NamespaceOrderFn invoke namespaceorder function of the plugins
func (ssn *Session) NamespaceOrderFn(l, r interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNamespaceOrder) {
continue
}
nof, found := ssn.namespaceOrderFns[plugin.Name]
if !found {
continue
}
if j := nof(l, r); j != 0 {
return j < 0
}
}
}
// TODO(lminzhw): if all NamespaceOrderFn treat these two namespace as the same,
// we should make the job order have its affect among namespaces.
// or just schedule namespace one by one
lv := l.(api.NamespaceName)
rv := r.(api.NamespaceName)
return lv < rv
}
// ClusterOrderFn invoke ClusterOrderFn function of the plugins
func (ssn *Session) ClusterOrderFn(l, r interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledClusterOrder) {
continue
}
cof, found := ssn.clusterOrderFns[plugin.Name]
if !found {
continue
}
if j := cof(l, r); j != 0 {
return j < 0
}
}
}
// If no cluster order funcs, order cluster by ClusterID
lv := l.(*scheduling.Cluster)
rv := r.(*scheduling.Cluster)
return lv.Name < rv.Name
}
// QueueOrderFn invoke queueorder function of the plugins
func (ssn *Session) QueueOrderFn(l, r interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledQueueOrder) {
continue
}
qof, found := ssn.queueOrderFns[plugin.Name]
if !found {
continue
}
if j := qof(l, r); j != 0 {
return j < 0
}
}
}
// If no queue order funcs, order queue by CreationTimestamp first, then by UID.
lv := l.(*api.QueueInfo)
rv := r.(*api.QueueInfo)
if lv.Queue.CreationTimestamp.Equal(&rv.Queue.CreationTimestamp) {
return lv.UID < rv.UID
}
return lv.Queue.CreationTimestamp.Before(&rv.Queue.CreationTimestamp)
}
// TaskCompareFns invoke taskorder function of the plugins
func (ssn *Session) TaskCompareFns(l, r interface{}) int {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledTaskOrder) {
continue
}
tof, found := ssn.taskOrderFns[plugin.Name]
if !found {
continue
}
if j := tof(l, r); j != 0 {
return j
}
}
}
return 0
}
// TaskOrderFn invoke taskorder function of the plugins
func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
if res := ssn.TaskCompareFns(l, r); res != 0 {
return res < 0
}
// If no task order funcs, order task by default func.
lv := l.(*api.TaskInfo)
rv := r.(*api.TaskInfo)
return helpers.CompareTask(lv, rv)
}
// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicate) {
continue
}
pfn, found := ssn.predicateFns[plugin.Name]
if !found {
continue
}
err := pfn(task, node)
if err != nil {
return err
}
}
}
return nil
}
// BestNodeFn invoke bestNode function of the plugins
func (ssn *Session) BestNodeFn(task *api.TaskInfo, nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledBestNode) {
continue
}
pfn, found := ssn.bestNodeFns[plugin.Name]
if !found {
continue
}
// Only the first plugin that enables and realizes bestNodeFn is allowed to choose best node for task
if bestNode := pfn(task, nodeScores); bestNode != nil {
return bestNode
}
}
}
return nil
}
// NodeOrderFn invoke node order function of the plugins
func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
priorityScore := 0.0
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNodeOrder) {
continue
}
pfn, found := ssn.nodeOrderFns[plugin.Name]
if !found {
continue
}
score, err := pfn(task, node)
if err != nil {
return 0, err
}
priorityScore += score
}
}
return priorityScore, nil
}
// BatchNodeOrderFn invoke node order function of the plugins
func (ssn *Session) BatchNodeOrderFn(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
priorityScore := make(map[string]float64, len(nodes))
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNodeOrder) {
continue
}
pfn, found := ssn.batchNodeOrderFns[plugin.Name]
if !found {
continue
}
score, err := pfn(task, nodes)
if err != nil {
return nil, err
}
for nodeName, score := range score {
priorityScore[nodeName] += score
}
}
}
return priorityScore, nil
}
func isEnabled(enabled *bool) bool {
return enabled != nil && *enabled
}
// NodeOrderMapFn invoke node order function of the plugins
func (ssn *Session) NodeOrderMapFn(task *api.TaskInfo, node *api.NodeInfo) (map[string]float64, float64, error) {
nodeScoreMap := map[string]float64{}
var priorityScore float64
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNodeOrder) {
continue
}
if pfn, found := ssn.nodeOrderFns[plugin.Name]; found {
score, err := pfn(task, node)
if err != nil {
return nodeScoreMap, priorityScore, err
}
priorityScore += score
}
if pfn, found := ssn.nodeMapFns[plugin.Name]; found {
score, err := pfn(task, node)
if err != nil {
return nodeScoreMap, priorityScore, err
}
nodeScoreMap[plugin.Name] = score
}
}
}
return nodeScoreMap, priorityScore, nil
}
// NodeOrderReduceFn invoke node order function of the plugins
func (ssn *Session) NodeOrderReduceFn(task *api.TaskInfo, pluginNodeScoreMap map[string]k8sframework.NodeScoreList) (map[string]float64, error) {
nodeScoreMap := map[string]float64{}
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledNodeOrder) {
continue
}
pfn, found := ssn.nodeReduceFns[plugin.Name]
if !found {
continue
}
if err := pfn(task, pluginNodeScoreMap[plugin.Name]); err != nil {
return nodeScoreMap, err
}
for _, hp := range pluginNodeScoreMap[plugin.Name] {
nodeScoreMap[hp.Name] += float64(hp.Score)
}
}
}
return nodeScoreMap, nil
}