in pkg/controllers/jobset_controller.go [311:349]
func (r *JobSetReconciler) resumeJobSetIfNecessary(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
log := ctrl.LoggerFrom(ctx)
nodeAffinities := map[string]map[string]string{}
for _, replicatedJob := range js.Spec.ReplicatedJobs {
nodeAffinities[replicatedJob.Name] = replicatedJob.Template.Spec.Template.Spec.NodeSelector
}
// If JobSpec is unsuspended, ensure all active child Jobs are also
// unsuspended and update the suspend condition to true.
for _, job := range ownedJobs.active {
if pointer.BoolDeref(job.Spec.Suspend, false) {
if job.Status.StartTime != nil {
job.Status.StartTime = nil
if err := r.Status().Update(ctx, job); err != nil {
return err
}
}
if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" {
// When resuming a job, its nodeSelectors should match that of the replicatedJob template
// that it was created from, which may have been updated while it was suspended.
job.Spec.Template.Spec.NodeSelector = nodeAffinities[job.Labels[jobset.ReplicatedJobNameKey]]
} else {
log.Error(nil, "job missing ReplicatedJobName label")
}
job.Spec.Suspend = pointer.Bool(false)
if err := r.Update(ctx, job); err != nil {
return err
}
}
}
return r.ensureCondition(ctx, js, corev1.EventTypeNormal, metav1.Condition{
Type: string(jobset.JobSetSuspended),
Status: metav1.ConditionStatus(corev1.ConditionFalse),
LastTransitionTime: metav1.Now(),
Reason: "ResumeJobs",
Message: "jobset is resumed",
})
}