in pkg/controller/jobframework/reconciler.go [242:320]
func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, object client.Object) (*kueue.Workload, error) {
log := ctrl.LoggerFrom(ctx)
// Find a matching workload first if there is one.
var toDelete []*kueue.Workload
var match *kueue.Workload
if pwName := ParentWorkloadName(job); pwName != "" {
pw := kueue.Workload{}
namespacedName := types.NamespacedName{
Name: pwName,
Namespace: object.GetNamespace(),
}
if err := r.client.Get(ctx, namespacedName, &pw); err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
log.V(2).Info("job with no matching parent workload", "parent-workload", pwName)
} else {
match = &pw
}
}
var workloads kueue.WorkloadList
if err := r.client.List(ctx, &workloads, client.InNamespace(object.GetNamespace()),
client.MatchingFields{getOwnerKey(job.GetGVK()): object.GetName()}); err != nil {
log.Error(err, "Unable to list child workloads")
return nil, err
}
for i := range workloads.Items {
w := &workloads.Items[i]
if match == nil && r.equivalentToWorkload(job, object, w) {
match = w
} else {
toDelete = append(toDelete, w)
}
}
// If there is no matching workload and the job is running, suspend it.
if match == nil && !job.IsSuspended() {
log.V(2).Info("job with no matching workload, suspending")
var w *kueue.Workload
if len(workloads.Items) == 1 {
// The job may have been modified and hence the existing workload
// doesn't match the job anymore. All bets are off if there are more
// than one workload...
w = &workloads.Items[0]
}
if err := r.stopJob(ctx, job, object, w, "No matching Workload"); err != nil {
log.Error(err, "stopping job")
}
}
// Delete duplicate workload instances.
existedWls := 0
for i := range toDelete {
err := r.client.Delete(ctx, toDelete[i])
if err == nil || !apierrors.IsNotFound(err) {
existedWls++
}
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "Failed to delete workload")
}
if err == nil {
r.record.Eventf(object, corev1.EventTypeNormal, "DeletedWorkload",
"Deleted not matching Workload: %v", workload.Key(toDelete[i]))
}
}
if existedWls != 0 {
if match == nil {
return nil, fmt.Errorf("no matching workload was found, tried deleting %d existing workload(s)", existedWls)
}
return nil, fmt.Errorf("only one workload should exist, found %d", len(workloads.Items))
}
return match, nil
}