pkg/scheduler/plugins/util/util.go (249 lines of code) (raw):
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
)
const (
// Permit indicates that plugin callback function permits job to be inqueue, pipelined, or other status
Permit = 1
// Abstain indicates that plugin callback function abstains in voting job to be inqueue, pipelined, or other status
Abstain = 0
// Reject indicates that plugin callback function rejects job to be inqueue, pipelined, or other status
Reject = -1
)
// PodFilter is a function to filter a pod. If pod passed return true else return false.
type PodFilter func(*v1.Pod) bool
// PodsLister interface represents anything that can list pods for a scheduler.
type PodsLister interface {
// Returns the list of pods.
List(labels.Selector) ([]*v1.Pod, error)
// This is similar to "List()", but the returned slice does not
// contain pods that don't pass `podFilter`.
FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
}
// PodLister is used in predicate and nodeorder plugin
type PodLister struct {
Session *framework.Session
CachedPods map[api.TaskID]*v1.Pod
Tasks map[api.TaskID]*api.TaskInfo
TaskWithAffinity map[api.TaskID]*api.TaskInfo
}
// PodAffinityLister is used to list pod with affinity
type PodAffinityLister struct {
pl *PodLister
}
// HaveAffinity checks pod have affinity or not
func HaveAffinity(pod *v1.Pod) bool {
affinity := pod.Spec.Affinity
return affinity != nil &&
(affinity.NodeAffinity != nil ||
affinity.PodAffinity != nil ||
affinity.PodAntiAffinity != nil)
}
// NewPodLister returns a PodLister generate from ssn
func NewPodLister(ssn *framework.Session) *PodLister {
pl := &PodLister{
Session: ssn,
CachedPods: make(map[api.TaskID]*v1.Pod),
Tasks: make(map[api.TaskID]*api.TaskInfo),
TaskWithAffinity: make(map[api.TaskID]*api.TaskInfo),
}
for _, job := range pl.Session.Jobs {
for status, tasks := range job.TaskStatusIndex {
if !api.AllocatedStatus(status) {
continue
}
for _, task := range tasks {
pl.Tasks[task.UID] = task
pod := pl.copyTaskPod(task)
pl.CachedPods[task.UID] = pod
if HaveAffinity(task.Pod) {
pl.TaskWithAffinity[task.UID] = task
}
}
}
}
return pl
}
// NewPodListerFromNode returns a PodLister generate from ssn
func NewPodListerFromNode(ssn *framework.Session) *PodLister {
pl := &PodLister{
Session: ssn,
CachedPods: make(map[api.TaskID]*v1.Pod),
Tasks: make(map[api.TaskID]*api.TaskInfo),
TaskWithAffinity: make(map[api.TaskID]*api.TaskInfo),
}
for _, node := range pl.Session.Nodes {
for _, task := range node.Tasks {
if !api.AllocatedStatus(task.Status) && task.Status != api.Releasing {
continue
}
pl.Tasks[task.UID] = task
pod := pl.copyTaskPod(task)
pl.CachedPods[task.UID] = pod
if HaveAffinity(task.Pod) {
pl.TaskWithAffinity[task.UID] = task
}
}
}
return pl
}
func (pl *PodLister) copyTaskPod(task *api.TaskInfo) *v1.Pod {
pod := task.Pod.DeepCopy()
pod.Spec.NodeName = task.NodeName
return pod
}
// GetPod will get pod with proper nodeName, from cache or DeepCopy
// keeping this function read only to avoid concurrent panic of map
func (pl *PodLister) GetPod(task *api.TaskInfo) *v1.Pod {
if task.NodeName == task.Pod.Spec.NodeName {
return task.Pod
}
pod, found := pl.CachedPods[task.UID]
if !found {
// we could not write the copied pod back into cache for read only
pod = pl.copyTaskPod(task)
klog.Warningf("DeepCopy for pod %s/%s at PodLister.GetPod is unexpected", pod.Namespace, pod.Name)
}
return pod
}
// UpdateTask will update the pod nodeName in cache using nodeName
// NOT thread safe, please ensure UpdateTask is the only called function of PodLister at the same time.
func (pl *PodLister) UpdateTask(task *api.TaskInfo, nodeName string) *v1.Pod {
pod, found := pl.CachedPods[task.UID]
if !found {
pod = pl.copyTaskPod(task)
pl.CachedPods[task.UID] = pod
}
pod.Spec.NodeName = nodeName
if !api.AllocatedStatus(task.Status) {
delete(pl.Tasks, task.UID)
if HaveAffinity(task.Pod) {
delete(pl.TaskWithAffinity, task.UID)
}
} else {
pl.Tasks[task.UID] = task
if HaveAffinity(task.Pod) {
pl.TaskWithAffinity[task.UID] = task
}
}
return pod
}
// List method is used to list all the pods
func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, task := range pl.Tasks {
pod := pl.GetPod(task)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
}
return pods, nil
}
// FilteredList is used to list all the pods under filter condition
func (pl *PodLister) filteredListWithTaskSet(taskSet map[api.TaskID]*api.TaskInfo, podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, task := range taskSet {
pod := pl.GetPod(task)
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
}
return pods, nil
}
// FilteredList is used to list all the pods under filter condition
func (pl *PodLister) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return pl.filteredListWithTaskSet(pl.Tasks, podFilter, selector)
}
// AffinityFilteredList is used to list all the pods with affinity under filter condition
func (pl *PodLister) AffinityFilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return pl.filteredListWithTaskSet(pl.TaskWithAffinity, podFilter, selector)
}
// AffinityLister generate a PodAffinityLister following current PodLister
func (pl *PodLister) AffinityLister() *PodAffinityLister {
pal := &PodAffinityLister{
pl: pl,
}
return pal
}
// List method is used to list all the pods
func (pal *PodAffinityLister) List(selector labels.Selector) ([]*v1.Pod, error) {
return pal.pl.List(selector)
}
// FilteredList is used to list all the pods with affinity under filter condition
func (pal *PodAffinityLister) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return pal.pl.AffinityFilteredList(podFilter, selector)
}
// GenerateNodeMapAndSlice returns the nodeMap and nodeSlice generated from ssn
func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*schedulernodeinfo.NodeInfo {
nodeMap := make(map[string]*schedulernodeinfo.NodeInfo)
for _, node := range nodes {
nodeInfo := schedulernodeinfo.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)
nodeMap[node.Name] = nodeInfo
}
return nodeMap
}
// CachedNodeInfo is used in nodeorder and predicate plugin
type CachedNodeInfo struct {
Session *framework.Session
}
// GetNodeInfo is used to get info of a particular node
func (c *CachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) {
node, found := c.Session.Nodes[name]
if !found {
return nil, errors.NewNotFound(v1.Resource("node"), name)
}
return node.Node, nil
}
// NodeLister is used in nodeorder plugin
type NodeLister struct {
Session *framework.Session
}
// List is used to list all the nodes
func (nl *NodeLister) List() ([]*v1.Node, error) {
var nodes []*v1.Node
for _, node := range nl.Session.Nodes {
nodes = append(nodes, node.Node)
}
return nodes, nil
}
// NormalizeScore normalizes the score for each filteredNode
func NormalizeScore(maxPriority int64, reverse bool, scores []api.ScoredNode) {
var maxCount int64
for _, scoreNode := range scores {
if scoreNode.Score > maxCount {
maxCount = scoreNode.Score
}
}
if maxCount == 0 {
if reverse {
for idx := range scores {
scores[idx].Score = maxPriority
}
}
return
}
for idx, scoreNode := range scores {
score := maxPriority * scoreNode.Score / maxCount
if reverse {
score = maxPriority - score
}
scores[idx].Score = score
}
}
// GetAllocatedResource returns allocated resource for given job
func GetAllocatedResource(job *api.JobInfo) *api.Resource {
allocated := &api.Resource{}
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
allocated.Add(t.Resreq)
}
}
}
return allocated
}
// GetInqueueResource returns reserved resource for running job whose part of pods have not been allocated resource.
func GetInqueueResource(job *api.JobInfo, allocated *api.Resource) *api.Resource {
inqueue := &api.Resource{}
for rName, rQuantity := range *job.PodGroup.Spec.MinResources {
switch rName {
case v1.ResourceCPU:
reservedCPU := float64(rQuantity.Value()) - allocated.MilliCPU
if reservedCPU > 0 {
inqueue.MilliCPU = reservedCPU
}
case v1.ResourceMemory:
reservedMemory := float64(rQuantity.Value()) - allocated.Memory
if reservedMemory > 0 {
inqueue.Memory = reservedMemory
}
default:
if inqueue.ScalarResources == nil {
inqueue.ScalarResources = make(map[v1.ResourceName]float64)
}
if allocatedMount, ok := allocated.ScalarResources[rName]; !ok {
inqueue.ScalarResources[rName] = float64(rQuantity.Value())
} else {
reservedScalarRes := float64(rQuantity.Value()) - allocatedMount
if reservedScalarRes > 0 {
inqueue.ScalarResources[rName] = reservedScalarRes
}
}
}
}
return inqueue
}