func()

in pkg/controller/jobframework/reconciler.go [242:320]


func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, object client.Object) (*kueue.Workload, error) {
	log := ctrl.LoggerFrom(ctx)

	// Find a matching workload first if there is one.
	var toDelete []*kueue.Workload
	var match *kueue.Workload

	if pwName := ParentWorkloadName(job); pwName != "" {
		pw := kueue.Workload{}
		namespacedName := types.NamespacedName{
			Name:      pwName,
			Namespace: object.GetNamespace(),
		}
		if err := r.client.Get(ctx, namespacedName, &pw); err != nil {
			if !apierrors.IsNotFound(err) {
				return nil, err
			}
			log.V(2).Info("job with no matching parent workload", "parent-workload", pwName)
		} else {
			match = &pw
		}
	}

	var workloads kueue.WorkloadList
	if err := r.client.List(ctx, &workloads, client.InNamespace(object.GetNamespace()),
		client.MatchingFields{getOwnerKey(job.GetGVK()): object.GetName()}); err != nil {
		log.Error(err, "Unable to list child workloads")
		return nil, err
	}

	for i := range workloads.Items {
		w := &workloads.Items[i]
		if match == nil && r.equivalentToWorkload(job, object, w) {
			match = w
		} else {
			toDelete = append(toDelete, w)
		}
	}

	// If there is no matching workload and the job is running, suspend it.
	if match == nil && !job.IsSuspended() {
		log.V(2).Info("job with no matching workload, suspending")
		var w *kueue.Workload
		if len(workloads.Items) == 1 {
			// The job may have been modified and hence the existing workload
			// doesn't match the job anymore. All bets are off if there are more
			// than one workload...
			w = &workloads.Items[0]
		}
		if err := r.stopJob(ctx, job, object, w, "No matching Workload"); err != nil {
			log.Error(err, "stopping job")
		}
	}

	// Delete duplicate workload instances.
	existedWls := 0
	for i := range toDelete {
		err := r.client.Delete(ctx, toDelete[i])
		if err == nil || !apierrors.IsNotFound(err) {
			existedWls++
		}
		if err != nil && !apierrors.IsNotFound(err) {
			log.Error(err, "Failed to delete workload")
		}
		if err == nil {
			r.record.Eventf(object, corev1.EventTypeNormal, "DeletedWorkload",
				"Deleted not matching Workload: %v", workload.Key(toDelete[i]))
		}
	}

	if existedWls != 0 {
		if match == nil {
			return nil, fmt.Errorf("no matching workload was found, tried deleting %d existing workload(s)", existedWls)
		}
		return nil, fmt.Errorf("only one workload should exist, found %d", len(workloads.Items))
	}

	return match, nil
}