in pkg/controller/core/workload_controller.go [249:330]
func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
oldWl, isWorkload := e.ObjectOld.(*kueue.Workload)
if !isWorkload {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
}
wl := e.ObjectNew.(*kueue.Workload)
defer r.notifyWatchers(oldWl, wl)
status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
ctx := ctrl.LoggerInto(context.Background(), log)
prevQueue := oldWl.Spec.QueueName
if prevQueue != wl.Spec.QueueName {
log = log.WithValues("prevQueue", prevQueue)
}
prevStatus := workloadStatus(oldWl)
if prevStatus != status {
log = log.WithValues("prevStatus", prevStatus)
}
if workload.IsAdmitted(wl) {
log = log.WithValues("clusterQueue", wl.Status.Admission.ClusterQueue)
}
if workload.IsAdmitted(oldWl) && (!workload.IsAdmitted(wl) || wl.Status.Admission.ClusterQueue != oldWl.Status.Admission.ClusterQueue) {
log = log.WithValues("prevClusterQueue", oldWl.Status.Admission.ClusterQueue)
}
log.V(2).Info("Workload update event")
wlCopy := wl.DeepCopy()
// We do not handle old workload here as it will be deleted or replaced by new one anyway.
r.adjustResources(log, wlCopy)
switch {
case status == finished:
// The workload could have been in the queues if we missed an event.
r.queues.DeleteWorkload(wl)
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
log.Error(err, "Failed to delete workload from cache")
}
})
case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
log.V(2).Info("Queue for updated workload didn't exist; ignoring for now")
}
case prevStatus == pending && status == admitted:
r.queues.DeleteWorkload(oldWl)
if !r.cache.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("ClusterQueue for workload didn't exist; ignored for now")
}
case prevStatus == admitted && status == pending:
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeueued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.DeleteWorkload(wl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
})
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
default:
// Workload update in the cache is handled here; however, some fields are immutable
// and are not supposed to actually change anything.
if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil {
log.Error(err, "Updating workload in cache")
}
}
return true
}