func()

in pkg/controllers/jobset_controller.go [82:164]


func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	// Get JobSet from apiserver.
	var js jobset.JobSet
	if err := r.Get(ctx, req.NamespacedName, &js); err != nil {
		// we'll ignore not-found errors, since there is nothing we can do here.
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	log := ctrl.LoggerFrom(ctx).WithValues("jobset", klog.KObj(&js))
	ctx = ctrl.LoggerInto(ctx, log)
	log.V(2).Info("Reconciling JobSet")

	// Get Jobs owned by JobSet.
	ownedJobs, err := r.getChildJobs(ctx, &js)
	if err != nil {
		log.Error(err, "getting jobs owned by jobset")
		return ctrl.Result{}, err
	}

	// If JobSet is already completed or failed, clean up active child jobs.
	if jobSetFinished(&js) {
		if err := r.deleteJobs(ctx, ownedJobs.active); err != nil {
			log.Error(err, "deleting jobs")
			return ctrl.Result{}, err
		}
		return ctrl.Result{}, nil
	}

	// Delete any jobs marked for deletion.
	if err := r.deleteJobs(ctx, ownedJobs.delete); err != nil {
		log.Error(err, "deleting jobs")
		return ctrl.Result{}, err
	}

	// If any jobs have failed, execute the JobSet failure policy (if any).
	if len(ownedJobs.failed) > 0 {
		if err := r.executeFailurePolicy(ctx, &js, ownedJobs); err != nil {
			log.Error(err, "executing failure policy")
			return ctrl.Result{}, err
		}
		return ctrl.Result{}, nil
	}

	// If any jobs have succeeded, execute the JobSet success policy.
	if len(ownedJobs.successful) > 0 {
		completed, err := r.executeSuccessPolicy(ctx, &js, ownedJobs)
		if err != nil {
			log.Error(err, "executing success policy")
			return ctrl.Result{}, err
		}
		if completed {
			return ctrl.Result{}, nil
		}
	}

	// If job has not failed or succeeded, continue creating any
	// jobs that are ready to be started.
	if err := r.createJobs(ctx, &js, ownedJobs); err != nil {
		log.Error(err, "creating jobs")
		return ctrl.Result{}, err
	}

	// Handle suspending a jobset or resuming a suspended jobset.
	jobsetSuspended := js.Spec.Suspend != nil && *js.Spec.Suspend
	if jobsetSuspended {
		if err := r.suspendJobSet(ctx, &js, ownedJobs); err != nil {
			log.Error(err, "suspending jobset")
			return ctrl.Result{}, err
		}
	} else {
		if err := r.resumeJobSetIfNecessary(ctx, &js, ownedJobs); err != nil {
			log.Error(err, "resuming jobset")
			return ctrl.Result{}, err
		}
	}
	// Calculate JobsReady and update statuses for each ReplicatedJob
	if err := r.calculateAndUpdateReplicatedJobsStatuses(ctx, &js, ownedJobs); err != nil {
		log.Error(err, "updating replicated jobs statuses")
		return ctrl.Result{}, err
	}

	return ctrl.Result{}, nil
}