in pkg/controller/jobframework/reconciler.go [97:237]
func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) {
object := job.Object()
if err := r.client.Get(ctx, req.NamespacedName, object); err != nil {
// we'll ignore not-found errors, since there is nothing to do.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
namespacedName := types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}
log := ctrl.LoggerFrom(ctx).WithValues("job", namespacedName.String())
ctx = ctrl.LoggerInto(ctx, log)
isStandaloneJob := ParentWorkloadName(job) == ""
// when manageJobsWithoutQueueName is disabled we only reconcile jobs that have either
// queue-name or the parent-workload annotation set.
if !r.manageJobsWithoutQueueName && QueueName(job) == "" && isStandaloneJob {
log.V(3).Info(fmt.Sprintf("Neither %s label, nor %s annotation is set, ignoring the job", QueueLabel, ParentWorkloadAnnotation))
return ctrl.Result{}, nil
}
log.V(2).Info("Reconciling Job")
// 1. make sure there is only a single existing instance of the workload.
// If there's no workload exists and job is unsuspended, we'll stop it immediately.
wl, err := r.ensureOneWorkload(ctx, job, object)
if err != nil {
log.Error(err, "Getting existing workloads")
return ctrl.Result{}, err
}
// 2. handle job is finished.
if condition, finished := job.Finished(); finished {
if wl == nil || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}
return ctrl.Result{}, nil
}
// 3. handle workload is nil.
if wl == nil {
if !isStandaloneJob {
return ctrl.Result{}, nil
}
err := r.handleJobWithNoWorkload(ctx, job, object)
if err != nil {
log.Error(err, "Handling job with no workload")
}
return ctrl.Result{}, err
}
// 4. update reclaimable counts.
if rp := job.ReclaimablePods(); !workload.ReclaimablePodsAreEqual(rp, wl.Status.ReclaimablePods) {
err = workload.UpdateReclaimablePods(ctx, r.client, wl, rp)
if err != nil {
log.Error(err, "Updating reclaimable pods")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// 5. handle WaitForPodsReady only for a standalone job.
if isStandaloneJob {
// handle a job when waitForPodsReady is enabled, and it is the main job
if r.waitForPodsReady {
log.V(5).Info("Handling a job when waitForPodsReady is enabled")
condition := generatePodsReadyCondition(job, wl)
// optimization to avoid sending the update request if the status didn't change
if !apimeta.IsStatusConditionPresentAndEqual(wl.Status.Conditions, condition.Type, condition.Status) {
log.V(3).Info(fmt.Sprintf("Updating the PodsReady condition with status: %v", condition.Status))
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}
}
}
}
// 6. handle eviction
if evCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadEvicted); evCond != nil && evCond.Status == metav1.ConditionTrue {
if !job.IsSuspended() {
log.V(6).Info("The job is not suspended, stop")
return ctrl.Result{}, r.stopJob(ctx, job, object, wl, evCond.Message)
}
if workload.IsAdmitted(wl) {
if !job.IsActive() {
log.V(6).Info("The job is no longer active, clear the workloads admission")
workload.UnsetAdmissionWithCondition(wl, "Pending", evCond.Message)
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
}
// The job is suspended but active, nothing to do now.
return ctrl.Result{}, nil
}
}
// 7. handle job is suspended.
if job.IsSuspended() {
// start the job if the workload has been admitted, and the job is still suspended
if workload.IsAdmitted(wl) {
log.V(2).Info("Job admitted, unsuspending")
err := r.startJob(ctx, job, object, wl)
if err != nil {
log.Error(err, "Unsuspending job")
}
return ctrl.Result{}, err
}
// update queue name if changed.
q := QueueName(job)
if wl.Spec.QueueName != q && isStandaloneJob {
log.V(2).Info("Job changed queues, updating workload")
wl.Spec.QueueName = q
err := r.client.Update(ctx, wl)
if err != nil {
log.Error(err, "Updating workload queue")
}
return ctrl.Result{}, err
}
log.V(3).Info("Job is suspended and workload not yet admitted by a clusterQueue, nothing to do")
return ctrl.Result{}, nil
}
// 8. handle job is unsuspended.
if !workload.IsAdmitted(wl) {
// the job must be suspended if the workload is not yet admitted.
log.V(2).Info("Running job is not admitted by a cluster queue, suspending")
err := r.stopJob(ctx, job, object, wl, "Not admitted by cluster queue")
if err != nil {
log.Error(err, "Suspending job with non admitted workload")
}
return ctrl.Result{}, err
}
// workload is admitted and job is running, nothing to do.
log.V(3).Info("Job running with admitted workload, nothing to do")
return ctrl.Result{}, nil
}