func()

in pkg/controllers/job/job_controller_actions.go [630:724]


func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
	// If PodGroup does not exist, create one for Job.
	pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
	if err != nil {
		if !apierrors.IsNotFound(err) {
			klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
				job.Namespace, job.Name, err)
			return err
		}

		minTaskMember := map[string]int32{}
		for _, task := range job.Spec.Tasks {
			if task.MinAvailable != nil {
				minTaskMember[task.Name] = *task.MinAvailable
			} else {
				minTaskMember[task.Name] = task.Replicas
			}
		}

		pg := &scheduling.PodGroup{
			ObjectMeta: metav1.ObjectMeta{
				Namespace:   job.Namespace,
				Name:        job.Name,
				Annotations: job.Annotations,
				Labels:      job.Labels,
				OwnerReferences: []metav1.OwnerReference{
					*metav1.NewControllerRef(job, helpers.JobKind),
				},
			},
			Spec: scheduling.PodGroupSpec{
				MinMember:         job.Spec.MinAvailable,
				MinTaskMember:     minTaskMember,
				Queue:             job.Spec.Queue,
				MinResources:      cc.calcPGMinResources(job),
				PriorityClassName: job.Spec.PriorityClassName,
			},
		}

		if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
			if !apierrors.IsAlreadyExists(err) {
				klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
					job.Namespace, job.Name, err)
				return err
			}
		}
		return nil
	}

	pgShouldUpdate := false
	if pg.Spec.PriorityClassName != job.Spec.PriorityClassName {
		pg.Spec.PriorityClassName = job.Spec.PriorityClassName
		pgShouldUpdate = true
	}

	minResources := cc.calcPGMinResources(job)
	if pg.Spec.MinMember != job.Spec.MinAvailable || !reflect.DeepEqual(pg.Spec.MinResources, minResources) {
		pg.Spec.MinMember = job.Spec.MinAvailable
		pg.Spec.MinResources = minResources
		pgShouldUpdate = true
	}

	if pg.Spec.MinTaskMember == nil {
		pgShouldUpdate = true
		pg.Spec.MinTaskMember = make(map[string]int32)
	}

	for _, task := range job.Spec.Tasks {
		if task.MinAvailable == nil {
			continue
		}

		if taskMember, ok := pg.Spec.MinTaskMember[task.Name]; !ok {
			pgShouldUpdate = true
			pg.Spec.MinTaskMember[task.Name] = *task.MinAvailable
		} else {
			if taskMember == *task.MinAvailable {
				continue
			}

			pgShouldUpdate = true
			pg.Spec.MinTaskMember[task.Name] = *task.MinAvailable
		}
	}

	if !pgShouldUpdate {
		return nil
	}

	_, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{})
	if err != nil {
		klog.V(3).Infof("Failed to update PodGroup for Job <%s/%s>: %v",
			job.Namespace, job.Name, err)
	}
	return err
}