in pkg/controllers/jobset_controller.go [82:164]
func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Get JobSet from apiserver.
var js jobset.JobSet
if err := r.Get(ctx, req.NamespacedName, &js); err != nil {
// we'll ignore not-found errors, since there is nothing we can do here.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := ctrl.LoggerFrom(ctx).WithValues("jobset", klog.KObj(&js))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling JobSet")
// Get Jobs owned by JobSet.
ownedJobs, err := r.getChildJobs(ctx, &js)
if err != nil {
log.Error(err, "getting jobs owned by jobset")
return ctrl.Result{}, err
}
// If JobSet is already completed or failed, clean up active child jobs.
if jobSetFinished(&js) {
if err := r.deleteJobs(ctx, ownedJobs.active); err != nil {
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Delete any jobs marked for deletion.
if err := r.deleteJobs(ctx, ownedJobs.delete); err != nil {
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
}
// If any jobs have failed, execute the JobSet failure policy (if any).
if len(ownedJobs.failed) > 0 {
if err := r.executeFailurePolicy(ctx, &js, ownedJobs); err != nil {
log.Error(err, "executing failure policy")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// If any jobs have succeeded, execute the JobSet success policy.
if len(ownedJobs.successful) > 0 {
completed, err := r.executeSuccessPolicy(ctx, &js, ownedJobs)
if err != nil {
log.Error(err, "executing success policy")
return ctrl.Result{}, err
}
if completed {
return ctrl.Result{}, nil
}
}
// If job has not failed or succeeded, continue creating any
// jobs that are ready to be started.
if err := r.createJobs(ctx, &js, ownedJobs); err != nil {
log.Error(err, "creating jobs")
return ctrl.Result{}, err
}
// Handle suspending a jobset or resuming a suspended jobset.
jobsetSuspended := js.Spec.Suspend != nil && *js.Spec.Suspend
if jobsetSuspended {
if err := r.suspendJobSet(ctx, &js, ownedJobs); err != nil {
log.Error(err, "suspending jobset")
return ctrl.Result{}, err
}
} else {
if err := r.resumeJobSetIfNecessary(ctx, &js, ownedJobs); err != nil {
log.Error(err, "resuming jobset")
return ctrl.Result{}, err
}
}
// Calculate JobsReady and update statuses for each ReplicatedJob
if err := r.calculateAndUpdateReplicatedJobsStatuses(ctx, &js, ownedJobs); err != nil {
log.Error(err, "updating replicated jobs statuses")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}