in pkg/controllers/jobset_controller.go [241:291]
func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, js *jobset.JobSet, jobs *childJobs) []jobset.ReplicatedJobStatus {
log := ctrl.LoggerFrom(ctx)
// Prepare replicatedJobsReady for optimal iteration
replicatedJobsReady := map[string]map[string]int32{}
for _, replicatedJob := range js.Spec.ReplicatedJobs {
replicatedJobsReady[replicatedJob.Name] = map[string]int32{
"ready": 0,
"succeeded": 0,
"failed": 0,
}
}
// Calculate jobsReady for each Replicated Job
for _, job := range jobs.active {
ready := pointer.Int32Deref(job.Status.Ready, 0)
// parallelism is always set as it is otherwise defaulted by k8s to 1
podsCount := *(job.Spec.Parallelism)
if job.Spec.Completions != nil && *job.Spec.Completions < podsCount {
podsCount = *job.Spec.Completions
}
if job.Status.Succeeded+ready >= podsCount {
if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" {
replicatedJobsReady[job.Labels[jobset.ReplicatedJobNameKey]]["ready"]++
} else {
log.Error(nil, fmt.Sprintf("job %s missing ReplicatedJobName label", job.Name))
}
}
}
// Calculate succeededJobs
for _, job := range jobs.successful {
replicatedJobsReady[job.Labels[jobset.ReplicatedJobNameKey]]["succeeded"]++
}
for _, job := range jobs.failed {
replicatedJobsReady[job.Labels[jobset.ReplicatedJobNameKey]]["failed"]++
}
// Calculate ReplicatedJobsStatus
var rjStatus []jobset.ReplicatedJobStatus
for name, status := range replicatedJobsReady {
rjStatus = append(rjStatus, jobset.ReplicatedJobStatus{
Name: name,
Ready: status["ready"],
Succeeded: status["succeeded"],
Failed: status["failed"],
})
}
return rjStatus
}