in pkg/controllers/job/job_controller_actions.go [542:594]
func (cc *jobcontroller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) {
// If PVC does not exist, create them for Job.
var needUpdate bool
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
for index, volume := range job.Spec.Volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
// NOTE(k82cn): Ensure never have duplicated generated names.
for {
vcName = jobhelpers.GenPVCName(job.Name)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return job, err
}
if exist {
continue
}
job.Spec.Volumes[index].VolumeClaimName = vcName
needUpdate = true
break
}
// TODO: check VolumeClaim must be set if VolumeClaimName is empty
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return job, err
}
}
} else {
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return job, err
}
if !exist {
return job, fmt.Errorf("pvc %s is not found, the job will be in the Pending state until the PVC is created", vcName)
}
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
}
if needUpdate {
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update Job %v/%v for volume claim name: %v ",
job.Namespace, job.Name, err)
return job, err
}
newJob.Status = job.Status
return newJob, err
}
return job, nil
}