func()

in pkg/controllers/jobset_controller.go [241:291]


func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, js *jobset.JobSet, jobs *childJobs) []jobset.ReplicatedJobStatus {
	log := ctrl.LoggerFrom(ctx)

	// Prepare replicatedJobsReady for optimal iteration
	replicatedJobsReady := map[string]map[string]int32{}
	for _, replicatedJob := range js.Spec.ReplicatedJobs {
		replicatedJobsReady[replicatedJob.Name] = map[string]int32{
			"ready":     0,
			"succeeded": 0,
			"failed":    0,
		}
	}

	// Calculate jobsReady for each Replicated Job
	for _, job := range jobs.active {
		ready := pointer.Int32Deref(job.Status.Ready, 0)
		// parallelism is always set as it is otherwise defaulted by k8s to 1
		podsCount := *(job.Spec.Parallelism)
		if job.Spec.Completions != nil && *job.Spec.Completions < podsCount {
			podsCount = *job.Spec.Completions
		}
		if job.Status.Succeeded+ready >= podsCount {
			if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" {
				replicatedJobsReady[job.Labels[jobset.ReplicatedJobNameKey]]["ready"]++
			} else {
				log.Error(nil, fmt.Sprintf("job %s missing ReplicatedJobName label", job.Name))
			}
		}
	}

	// Calculate succeededJobs
	for _, job := range jobs.successful {
		replicatedJobsReady[job.Labels[jobset.ReplicatedJobNameKey]]["succeeded"]++
	}

	for _, job := range jobs.failed {
		replicatedJobsReady[job.Labels[jobset.ReplicatedJobNameKey]]["failed"]++
	}

	// Calculate ReplicatedJobsStatus
	var rjStatus []jobset.ReplicatedJobStatus
	for name, status := range replicatedJobsReady {
		rjStatus = append(rjStatus, jobset.ReplicatedJobStatus{
			Name:      name,
			Ready:     status["ready"],
			Succeeded: status["succeeded"],
			Failed:    status["failed"],
		})
	}
	return rjStatus
}