pkg/scheduler/util/scheduler_helper.go (169 lines of code) (raw):
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/scheduler/api"
)
const baselinePercentageOfNodesToFind = 50
var lastProcessedNodeIndex int
// Reservation is used to record target job and locked nodes
var Reservation *ResourceReservation
func init() {
Reservation = NewResourceReservation()
}
// CalculateNumOfFeasibleNodesToFind returns the number of feasible nodes that once found,
// the scheduler stops its search for more feasible nodes.
func CalculateNumOfFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
opts := options.ServerOpts
if numAllNodes <= opts.MinNodesToFind || opts.PercentageOfNodesToFind >= 100 {
return numAllNodes
}
adaptivePercentage := opts.PercentageOfNodesToFind
if adaptivePercentage <= 0 {
adaptivePercentage = baselinePercentageOfNodesToFind - numAllNodes/125
if adaptivePercentage < opts.MinPercentageOfNodesToFind {
adaptivePercentage = opts.MinPercentageOfNodesToFind
}
}
numNodes = numAllNodes * adaptivePercentage / 100
if numNodes < opts.MinNodesToFind {
numNodes = opts.MinNodesToFind
}
return numNodes
}
// PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes
func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.BatchNodeOrderFn, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo {
pluginNodeScoreMap := map[string]k8sframework.NodeScoreList{}
nodeOrderScoreMap := map[string]float64{}
nodeScores := map[float64][]*api.NodeInfo{}
var workerLock sync.Mutex
scoreNode := func(index int) {
node := nodes[index]
mapScores, orderScore, err := mapFn(task, node)
if err != nil {
klog.Errorf("Error in Calculating Priority for the node:%v", err)
return
}
workerLock.Lock()
for plugin, score := range mapScores {
nodeScoreMap, ok := pluginNodeScoreMap[plugin]
if !ok {
nodeScoreMap = k8sframework.NodeScoreList{}
}
hp := k8sframework.NodeScore{}
hp.Name = node.Name
hp.Score = int64(math.Floor(score))
pluginNodeScoreMap[plugin] = append(nodeScoreMap, hp)
}
nodeOrderScoreMap[node.Name] = orderScore
workerLock.Unlock()
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), scoreNode)
reduceScores, err := reduceFn(task, pluginNodeScoreMap)
if err != nil {
klog.Errorf("Error in Calculating Priority for the node:%v", err)
return nodeScores
}
batchNodeScore, err := batchFn(task, nodes)
if err != nil {
klog.Errorf("Error in Calculating batch Priority for the node, err %v", err)
return nodeScores
}
for _, node := range nodes {
if score, found := reduceScores[node.Name]; found {
if orderScore, ok := nodeOrderScoreMap[node.Name]; ok {
score += orderScore
}
if batchScore, ok := batchNodeScore[node.Name]; ok {
score += batchScore
}
nodeScores[score] = append(nodeScores[score], node)
} else {
// If no plugin is applied to this node, the default is 0.0
score = 0.0
if orderScore, ok := nodeOrderScoreMap[node.Name]; ok {
score += orderScore
}
if batchScore, ok := batchNodeScore[node.Name]; ok {
score += batchScore
}
nodeScores[score] = append(nodeScores[score], node)
}
}
return nodeScores
}
// SortNodes returns nodes by order of score
func SortNodes(nodeScores map[float64][]*api.NodeInfo) []*api.NodeInfo {
var nodesInorder []*api.NodeInfo
var keys []float64
for key := range nodeScores {
keys = append(keys, key)
}
sort.Sort(sort.Reverse(sort.Float64Slice(keys)))
for _, key := range keys {
nodes := nodeScores[key]
nodesInorder = append(nodesInorder, nodes...)
}
return nodesInorder
}
// SelectBestNode returns best node whose score is highest, pick one randomly if there are many nodes with same score.
func SelectBestNode(nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo {
var bestNodes []*api.NodeInfo
maxScore := -1.0
for score, nodes := range nodeScores {
if score > maxScore {
maxScore = score
bestNodes = nodes
}
}
if len(bestNodes) == 0 {
return nil
}
return bestNodes[rand.Intn(len(bestNodes))]
}
// GetNodeList returns values of the map 'nodes'
func GetNodeList(nodes map[string]*api.NodeInfo, nodeList []string) []*api.NodeInfo {
result := make([]*api.NodeInfo, 0, len(nodeList))
for _, nodename := range nodeList {
if ni, ok := nodes[nodename]; ok {
result = append(result, ni)
}
}
return result
}
// ValidateVictims returns an error if the resources of the victims can't satisfy the preemptor
func ValidateVictims(preemptor *api.TaskInfo, node *api.NodeInfo, victims []*api.TaskInfo) error {
if len(victims) == 0 {
return fmt.Errorf("no victims")
}
futureIdle := node.FutureIdle()
for _, victim := range victims {
futureIdle.Add(victim.Resreq)
}
// Every resource of the preemptor needs to be less or equal than corresponding
// idle resource after preemption.
if !preemptor.InitResreq.LessEqual(futureIdle, api.Zero) {
return fmt.Errorf("not enough resources: requested <%v>, but future idle <%v>",
preemptor.InitResreq, futureIdle)
}
return nil
}
// ResourceReservation is struct used for resource reservation
type ResourceReservation struct {
TargetJob *api.JobInfo
LockedNodes map[string]*api.NodeInfo
}
// NewResourceReservation is used to create global instance
func NewResourceReservation() *ResourceReservation {
return &ResourceReservation{
TargetJob: nil,
LockedNodes: map[string]*api.NodeInfo{},
}
}
// GetMinInt return minimum int from vals
func GetMinInt(vals ...int) int {
if len(vals) == 0 {
return 0
}
min := vals[0]
for _, val := range vals {
if val <= min {
min = val
}
}
return min
}