func()

in pkg/controller/jobframework/reconciler.go [97:237]


func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) {
	object := job.Object()
	if err := r.client.Get(ctx, req.NamespacedName, object); err != nil {
		// we'll ignore not-found errors, since there is nothing to do.
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	namespacedName := types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}

	log := ctrl.LoggerFrom(ctx).WithValues("job", namespacedName.String())
	ctx = ctrl.LoggerInto(ctx, log)

	isStandaloneJob := ParentWorkloadName(job) == ""

	// when manageJobsWithoutQueueName is disabled we only reconcile jobs that have either
	// queue-name or the parent-workload annotation set.
	if !r.manageJobsWithoutQueueName && QueueName(job) == "" && isStandaloneJob {
		log.V(3).Info(fmt.Sprintf("Neither %s label, nor %s annotation is set, ignoring the job", QueueLabel, ParentWorkloadAnnotation))
		return ctrl.Result{}, nil
	}

	log.V(2).Info("Reconciling Job")

	// 1. make sure there is only a single existing instance of the workload.
	// If there's no workload exists and job is unsuspended, we'll stop it immediately.
	wl, err := r.ensureOneWorkload(ctx, job, object)
	if err != nil {
		log.Error(err, "Getting existing workloads")
		return ctrl.Result{}, err
	}

	// 2. handle job is finished.
	if condition, finished := job.Finished(); finished {
		if wl == nil || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
			return ctrl.Result{}, nil
		}
		err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
		if err != nil {
			log.Error(err, "Updating workload status")
		}
		return ctrl.Result{}, nil
	}

	// 3. handle workload is nil.
	if wl == nil {
		if !isStandaloneJob {
			return ctrl.Result{}, nil
		}
		err := r.handleJobWithNoWorkload(ctx, job, object)
		if err != nil {
			log.Error(err, "Handling job with no workload")
		}
		return ctrl.Result{}, err
	}

	// 4. update reclaimable counts.
	if rp := job.ReclaimablePods(); !workload.ReclaimablePodsAreEqual(rp, wl.Status.ReclaimablePods) {
		err = workload.UpdateReclaimablePods(ctx, r.client, wl, rp)
		if err != nil {
			log.Error(err, "Updating reclaimable pods")
			return ctrl.Result{}, err
		}
		return ctrl.Result{}, nil
	}

	// 5. handle WaitForPodsReady only for a standalone job.
	if isStandaloneJob {
		// handle a job when waitForPodsReady is enabled, and it is the main job
		if r.waitForPodsReady {
			log.V(5).Info("Handling a job when waitForPodsReady is enabled")
			condition := generatePodsReadyCondition(job, wl)
			// optimization to avoid sending the update request if the status didn't change
			if !apimeta.IsStatusConditionPresentAndEqual(wl.Status.Conditions, condition.Type, condition.Status) {
				log.V(3).Info(fmt.Sprintf("Updating the PodsReady condition with status: %v", condition.Status))
				apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
				err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
				if err != nil {
					log.Error(err, "Updating workload status")
				}
			}
		}
	}

	// 6. handle eviction
	if evCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadEvicted); evCond != nil && evCond.Status == metav1.ConditionTrue {
		if !job.IsSuspended() {
			log.V(6).Info("The job is not suspended, stop")
			return ctrl.Result{}, r.stopJob(ctx, job, object, wl, evCond.Message)
		}
		if workload.IsAdmitted(wl) {
			if !job.IsActive() {
				log.V(6).Info("The job is no longer active, clear the workloads admission")
				workload.UnsetAdmissionWithCondition(wl, "Pending", evCond.Message)
				return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
			}
			// The job is suspended but active, nothing to do now.
			return ctrl.Result{}, nil
		}
	}

	// 7. handle job is suspended.
	if job.IsSuspended() {
		// start the job if the workload has been admitted, and the job is still suspended
		if workload.IsAdmitted(wl) {
			log.V(2).Info("Job admitted, unsuspending")
			err := r.startJob(ctx, job, object, wl)
			if err != nil {
				log.Error(err, "Unsuspending job")
			}
			return ctrl.Result{}, err
		}

		// update queue name if changed.
		q := QueueName(job)
		if wl.Spec.QueueName != q && isStandaloneJob {
			log.V(2).Info("Job changed queues, updating workload")
			wl.Spec.QueueName = q
			err := r.client.Update(ctx, wl)
			if err != nil {
				log.Error(err, "Updating workload queue")
			}
			return ctrl.Result{}, err
		}
		log.V(3).Info("Job is suspended and workload not yet admitted by a clusterQueue, nothing to do")
		return ctrl.Result{}, nil
	}

	// 8. handle job is unsuspended.
	if !workload.IsAdmitted(wl) {
		// the job must be suspended if the workload is not yet admitted.
		log.V(2).Info("Running job is not admitted by a cluster queue, suspending")
		err := r.stopJob(ctx, job, object, wl, "Not admitted by cluster queue")
		if err != nil {
			log.Error(err, "Suspending job with non admitted workload")
		}
		return ctrl.Result{}, err
	}

	// workload is admitted and job is running, nothing to do.
	log.V(3).Info("Job running with admitted workload, nothing to do")
	return ctrl.Result{}, nil
}