pkg/queue/cluster_queue_impl.go (164 lines of code) (raw):
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/heap"
"sigs.k8s.io/kueue/pkg/workload"
)
// clusterQueueBase is an incomplete base implementation of ClusterQueue
// interface. It can be inherited and overwritten by other types.
type clusterQueueBase struct {
heap heap.Heap
cohort string
namespaceSelector labels.Selector
// inadmissibleWorkloads are workloads that have been tried at least once and couldn't be admitted.
inadmissibleWorkloads map[string]*workload.Info
// popCycle identifies the last call to Pop. It's incremented when calling Pop.
// popCycle and queueInadmissibleCycle are used to track when there is a requeueing
// of inadmissible workloads while a workload is being scheduled.
popCycle int64
// queueInadmissibleCycle stores the popId at the time when
// QueueInadmissibleWorkloads is called.
queueInadmissibleCycle int64
}
func newClusterQueueImpl(keyFunc func(obj interface{}) string, lessFunc func(a, b interface{}) bool) *clusterQueueBase {
return &clusterQueueBase{
heap: heap.New(keyFunc, lessFunc),
inadmissibleWorkloads: make(map[string]*workload.Info),
queueInadmissibleCycle: -1,
}
}
func (c *clusterQueueBase) Update(apiCQ *kueue.ClusterQueue) error {
c.cohort = apiCQ.Spec.Cohort
nsSelector, err := metav1.LabelSelectorAsSelector(apiCQ.Spec.NamespaceSelector)
if err != nil {
return err
}
c.namespaceSelector = nsSelector
return nil
}
func (c *clusterQueueBase) Cohort() string {
return c.cohort
}
func (c *clusterQueueBase) AddFromLocalQueue(q *LocalQueue) bool {
added := false
for _, info := range q.items {
if c.heap.PushIfNotPresent(info) {
added = true
}
}
return added
}
func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
key := workload.Key(wInfo.Obj)
oldInfo := c.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible, unless the Eviction status changed
// which can affect the workloads order in the queue.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) &&
equality.Semantic.DeepEqual(apimeta.FindStatusCondition(oldInfo.Obj.Status.Conditions, kueue.WorkloadEvicted),
apimeta.FindStatusCondition(wInfo.Obj.Status.Conditions, kueue.WorkloadEvicted)) {
c.inadmissibleWorkloads[key] = wInfo
return
}
// otherwise move or update in place in the queue.
delete(c.inadmissibleWorkloads, key)
}
c.heap.PushOrUpdate(wInfo)
}
func (c *clusterQueueBase) Delete(w *kueue.Workload) {
key := workload.Key(w)
delete(c.inadmissibleWorkloads, key)
c.heap.Delete(key)
}
func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) {
for _, w := range q.items {
key := workload.Key(w.Obj)
if wl := c.inadmissibleWorkloads[key]; wl != nil {
delete(c.inadmissibleWorkloads, key)
}
}
for _, w := range q.items {
c.Delete(w.Obj)
}
}
// requeueIfNotPresent inserts a workload that cannot be admitted into
// ClusterQueue, unless it is already in the queue. If immediate is true
// or if there was a call to QueueInadmissibleWorkloads after a call to Pop,
// the workload will be pushed back to heap directly. Otherwise, the workload
// will be put into the inadmissibleWorkloads.
func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate bool) bool {
key := workload.Key(wInfo.Obj)
if immediate || c.queueInadmissibleCycle >= c.popCycle {
// If the workload was inadmissible, move it back into the queue.
inadmissibleWl := c.inadmissibleWorkloads[key]
if inadmissibleWl != nil {
wInfo = inadmissibleWl
delete(c.inadmissibleWorkloads, key)
}
return c.heap.PushIfNotPresent(wInfo)
}
if c.inadmissibleWorkloads[key] != nil {
return false
}
if data := c.heap.GetByKey(key); data != nil {
return false
}
c.inadmissibleWorkloads[key] = wInfo
return true
}
// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true. Otherwise returns false.
func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool {
c.queueInadmissibleCycle = c.popCycle
if len(c.inadmissibleWorkloads) == 0 {
return false
}
inadmissibleWorkloads := make(map[string]*workload.Info)
moved := false
for key, wInfo := range c.inadmissibleWorkloads {
ns := corev1.Namespace{}
err := client.Get(ctx, types.NamespacedName{Name: wInfo.Obj.Namespace}, &ns)
if err != nil || !c.namespaceSelector.Matches(labels.Set(ns.Labels)) {
inadmissibleWorkloads[key] = wInfo
} else {
moved = c.heap.PushIfNotPresent(wInfo) || moved
}
}
c.inadmissibleWorkloads = inadmissibleWorkloads
return moved
}
func (c *clusterQueueBase) Pending() int {
return c.PendingActive() + c.PendingInadmissible()
}
func (c *clusterQueueBase) PendingActive() int {
return c.heap.Len()
}
func (c *clusterQueueBase) PendingInadmissible() int {
return len(c.inadmissibleWorkloads)
}
func (c *clusterQueueBase) Pop() *workload.Info {
c.popCycle++
if c.heap.Len() == 0 {
return nil
}
info := c.heap.Pop()
return info.(*workload.Info)
}
func (c *clusterQueueBase) Dump() (sets.Set[string], bool) {
if c.heap.Len() == 0 {
return nil, false
}
elements := make(sets.Set[string], c.heap.Len())
for _, e := range c.heap.List() {
info := e.(*workload.Info)
elements.Insert(workload.Key(info.Obj))
}
return elements, true
}
func (c *clusterQueueBase) DumpInadmissible() (sets.Set[string], bool) {
if len(c.inadmissibleWorkloads) == 0 {
return nil, false
}
elements := make(sets.Set[string], len(c.inadmissibleWorkloads))
for _, info := range c.inadmissibleWorkloads {
elements.Insert(workload.Key(info.Obj))
}
return elements, true
}
func (c *clusterQueueBase) Info(key string) *workload.Info {
info := c.heap.GetByKey(key)
if info == nil {
return nil
}
return info.(*workload.Info)
}