in pkg/controllers/job/job_controller_actions.go [630:724]
func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
minTaskMember := map[string]int32{}
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
minTaskMember[task.Name] = *task.MinAvailable
} else {
minTaskMember[task.Name] = task.Replicas
}
}
pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
},
}
if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
return nil
}
pgShouldUpdate := false
if pg.Spec.PriorityClassName != job.Spec.PriorityClassName {
pg.Spec.PriorityClassName = job.Spec.PriorityClassName
pgShouldUpdate = true
}
minResources := cc.calcPGMinResources(job)
if pg.Spec.MinMember != job.Spec.MinAvailable || !reflect.DeepEqual(pg.Spec.MinResources, minResources) {
pg.Spec.MinMember = job.Spec.MinAvailable
pg.Spec.MinResources = minResources
pgShouldUpdate = true
}
if pg.Spec.MinTaskMember == nil {
pgShouldUpdate = true
pg.Spec.MinTaskMember = make(map[string]int32)
}
for _, task := range job.Spec.Tasks {
if task.MinAvailable == nil {
continue
}
if taskMember, ok := pg.Spec.MinTaskMember[task.Name]; !ok {
pgShouldUpdate = true
pg.Spec.MinTaskMember[task.Name] = *task.MinAvailable
} else {
if taskMember == *task.MinAvailable {
continue
}
pgShouldUpdate = true
pg.Spec.MinTaskMember[task.Name] = *task.MinAvailable
}
}
if !pgShouldUpdate {
return nil
}
_, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{})
if err != nil {
klog.V(3).Infof("Failed to update PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
}
return err
}