pkg/controller/core/clusterqueue_controller.go (301 lines of code) (raw):
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"context"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/workload"
)
type ClusterQueueUpdateWatcher interface {
NotifyClusterQueueUpdate(*kueue.ClusterQueue, *kueue.ClusterQueue)
}
// ClusterQueueReconciler reconciles a ClusterQueue object
type ClusterQueueReconciler struct {
client client.Client
log logr.Logger
qManager *queue.Manager
cache *cache.Cache
wlUpdateCh chan event.GenericEvent
rfUpdateCh chan event.GenericEvent
watchers []ClusterQueueUpdateWatcher
}
func NewClusterQueueReconciler(
client client.Client,
qMgr *queue.Manager,
cache *cache.Cache,
watchers ...ClusterQueueUpdateWatcher,
) *ClusterQueueReconciler {
return &ClusterQueueReconciler{
client: client,
log: ctrl.Log.WithName("cluster-queue-reconciler"),
qManager: qMgr,
cache: cache,
wlUpdateCh: make(chan event.GenericEvent, updateChBuffer),
rfUpdateCh: make(chan event.GenericEvent, updateChBuffer),
watchers: watchers,
}
}
//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/finalizers,verbs=update
func (r *ClusterQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var cqObj kueue.ClusterQueue
if err := r.client.Get(ctx, req.NamespacedName, &cqObj); err != nil {
// we'll ignore not-found errors, since there is nothing to do.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := ctrl.LoggerFrom(ctx).WithValues("clusterQueue", klog.KObj(&cqObj))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling ClusterQueue")
if cqObj.ObjectMeta.DeletionTimestamp.IsZero() {
// Although we'll add the finalizer via webhook mutation now, this is still useful
// as a fallback.
if !controllerutil.ContainsFinalizer(&cqObj, kueue.ResourceInUseFinalizerName) {
controllerutil.AddFinalizer(&cqObj, kueue.ResourceInUseFinalizerName)
if err := r.client.Update(ctx, &cqObj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
} else {
if !r.cache.ClusterQueueTerminating(cqObj.Name) {
r.cache.TerminateClusterQueue(cqObj.Name)
}
if controllerutil.ContainsFinalizer(&cqObj, kueue.ResourceInUseFinalizerName) {
// The clusterQueue is being deleted, remove the finalizer only if
// there are no active admitted workloads.
if r.cache.ClusterQueueEmpty(cqObj.Name) {
controllerutil.RemoveFinalizer(&cqObj, kueue.ResourceInUseFinalizerName)
if err := r.client.Update(ctx, &cqObj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
return ctrl.Result{}, nil
}
}
newCQObj := cqObj.DeepCopy()
if r.cache.ClusterQueueActive(newCQObj.Name) {
msg := "Can admit new workloads"
if err := r.updateCqStatusIfChanged(ctx, newCQObj, metav1.ConditionTrue, "Ready", msg); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
} else if r.cache.ClusterQueueTerminating(newCQObj.Name) {
msg := "Can't admit new workloads; clusterQueue is terminating"
if err := r.updateCqStatusIfChanged(ctx, newCQObj, metav1.ConditionFalse, "Terminating", msg); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
} else {
msg := "Can't admit new workloads; some flavors are not found"
if err := r.updateCqStatusIfChanged(ctx, newCQObj, metav1.ConditionFalse, "FlavorNotFound", msg); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
return ctrl.Result{}, nil
}
func (r *ClusterQueueReconciler) NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload) {
if oldWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: oldWl}
if newWl != nil && oldWl.Spec.QueueName != newWl.Spec.QueueName {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
return
}
if newWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
}
func (r *ClusterQueueReconciler) notifyWatchers(oldCQ, newCQ *kueue.ClusterQueue) {
for _, w := range r.watchers {
w.NotifyClusterQueueUpdate(oldCQ, newCQ)
}
}
func (r *ClusterQueueReconciler) NotifyResourceFlavorUpdate(rf *kueue.ResourceFlavor) {
r.rfUpdateCh <- event.GenericEvent{Object: rf}
}
// Event handlers return true to signal the controller to reconcile the
// ClusterQueue associated with the event.
func (r *ClusterQueueReconciler) Create(e event.CreateEvent) bool {
cq, match := e.Object.(*kueue.ClusterQueue)
if !match {
// No need to interact with the cache for other objects.
return true
}
defer r.notifyWatchers(nil, cq)
log := r.log.WithValues("clusterQueue", klog.KObj(cq))
log.V(2).Info("ClusterQueue create event")
ctx := ctrl.LoggerInto(context.Background(), log)
if err := r.cache.AddClusterQueue(ctx, cq); err != nil {
log.Error(err, "Failed to add clusterQueue to cache")
}
if err := r.qManager.AddClusterQueue(ctx, cq); err != nil {
log.Error(err, "Failed to add clusterQueue to queue manager")
}
return true
}
func (r *ClusterQueueReconciler) Delete(e event.DeleteEvent) bool {
cq, match := e.Object.(*kueue.ClusterQueue)
if !match {
// No need to interact with the cache for other objects.
return true
}
defer r.notifyWatchers(cq, nil)
r.log.V(2).Info("ClusterQueue delete event", "clusterQueue", klog.KObj(cq))
r.cache.DeleteClusterQueue(cq)
r.qManager.DeleteClusterQueue(cq)
return true
}
func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
oldCq, match := e.ObjectOld.(*kueue.ClusterQueue)
if !match {
// No need to interact with the cache for other objects.
return true
}
newCq, match := e.ObjectNew.(*kueue.ClusterQueue)
if !match {
// No need to interact with the cache for other objects.
return true
}
log := r.log.WithValues("clusterQueue", klog.KObj(newCq))
log.V(2).Info("ClusterQueue update event")
if newCq.DeletionTimestamp != nil {
return true
}
defer r.notifyWatchers(oldCq, newCq)
if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}
return true
}
func (r *ClusterQueueReconciler) Generic(e event.GenericEvent) bool {
r.log.V(2).Info("Got generic event", "obj", klog.KObj(e.Object), "kind", e.Object.GetObjectKind().GroupVersionKind())
return true
}
// cqWorkloadHandler signals the controller to reconcile the ClusterQueue
// associated to the workload in the event.
// Since the events come from a channel Source, only the Generic handler will
// receive events.
type cqWorkloadHandler struct {
qManager *queue.Manager
}
func (h *cqWorkloadHandler) Create(event.CreateEvent, workqueue.RateLimitingInterface) {
}
func (h *cqWorkloadHandler) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {
}
func (h *cqWorkloadHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {
}
func (h *cqWorkloadHandler) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) {
w := e.Object.(*kueue.Workload)
req := h.requestForWorkloadClusterQueue(w)
if req != nil {
q.AddAfter(*req, constants.UpdatesBatchPeriod)
}
}
func (h *cqWorkloadHandler) requestForWorkloadClusterQueue(w *kueue.Workload) *reconcile.Request {
var name string
if workload.IsAdmitted(w) {
name = string(w.Status.Admission.ClusterQueue)
} else {
var ok bool
name, ok = h.qManager.ClusterQueueForWorkload(w)
if !ok {
return nil
}
}
return &reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
},
}
}
// cqNamespaceHandler handles namespace update events.
type cqNamespaceHandler struct {
qManager *queue.Manager
cache *cache.Cache
}
func (h *cqNamespaceHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) {
}
func (h *cqNamespaceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
oldNs := e.ObjectOld.(*corev1.Namespace)
oldMatchingCqs := h.cache.MatchingClusterQueues(oldNs.Labels)
newNs := e.ObjectNew.(*corev1.Namespace)
newMatchingCqs := h.cache.MatchingClusterQueues(newNs.Labels)
cqs := sets.New[string]()
for cq := range newMatchingCqs {
if !oldMatchingCqs.Has(cq) {
cqs.Insert(cq)
}
}
h.qManager.QueueInadmissibleWorkloads(context.Background(), cqs)
}
func (h *cqNamespaceHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {
}
func (h *cqNamespaceHandler) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {
}
type cqResourceFlavorHandler struct {
cache *cache.Cache
}
func (h *cqResourceFlavorHandler) Create(event.CreateEvent, workqueue.RateLimitingInterface) {
}
func (h *cqResourceFlavorHandler) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {
}
func (h *cqResourceFlavorHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {
}
func (h *cqResourceFlavorHandler) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) {
rf, ok := e.Object.(*kueue.ResourceFlavor)
if !ok {
return
}
if cqs := h.cache.ClusterQueuesUsingFlavor(rf.Name); len(cqs) != 0 {
for _, cq := range cqs {
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: cq,
}}
q.Add(req)
}
}
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
wHandler := cqWorkloadHandler{
qManager: r.qManager,
}
nsHandler := cqNamespaceHandler{
qManager: r.qManager,
cache: r.cache,
}
rfHandler := cqResourceFlavorHandler{
cache: r.cache,
}
return ctrl.NewControllerManagedBy(mgr).
For(&kueue.ClusterQueue{}).
Watches(&source.Kind{Type: &corev1.Namespace{}}, &nsHandler).
Watches(&source.Channel{Source: r.wlUpdateCh}, &wHandler).
Watches(&source.Channel{Source: r.rfUpdateCh}, &rfHandler).
WithEventFilter(r).
Complete(r)
}
func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
ctx context.Context,
cq *kueue.ClusterQueue,
conditionStatus metav1.ConditionStatus,
reason, msg string,
) error {
oldStatus := cq.Status.DeepCopy()
pendingWorkloads := r.qManager.Pending(cq)
usage, workloads, err := r.cache.Usage(cq)
if err != nil {
r.log.Error(err, "Failed getting usage from cache")
// This is likely because the cluster queue was recently removed,
// but we didn't process that event yet.
return err
}
cq.Status.FlavorsUsage = usage
cq.Status.AdmittedWorkloads = int32(workloads)
cq.Status.PendingWorkloads = int32(pendingWorkloads)
meta.SetStatusCondition(&cq.Status.Conditions, metav1.Condition{
Type: kueue.ClusterQueueActive,
Status: conditionStatus,
Reason: reason,
Message: msg,
})
if !equality.Semantic.DeepEqual(cq.Status, oldStatus) {
return r.client.Status().Update(ctx, cq)
}
return nil
}