in pkg/flink/resources.go [231:275]
func (fc *FlinkCluster) updateJobSpec(taskCtx FlinkTaskContext) error {
if fc.Spec.Job == nil {
fc.Spec.Job = &flinkOp.JobSpec{}
}
out := fc.Spec.Job
out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations)
out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels)
out.ClassName = &taskCtx.Job.MainClass
out.Args = taskCtx.Job.Args
if taskCtx.Job.Parallelism != 0 {
out.Parallelism = &taskCtx.Job.Parallelism
}
artifacts := GetJobArtifacts(&taskCtx.Job)
if out.JarFile == nil && len(artifacts) == 1 {
out.JarFile = &artifacts[0]
} else {
initContainers := []corev1.Container{}
for _, container := range out.InitContainers {
resultArgs := []string{}
for _, arg := range container.Args {
tmpl, err := containerTmpl.Parse(arg)
if err != nil {
return err
}
var tpl bytes.Buffer
if err := tmpl.Execute(&tpl, NewContainerTemplateData(artifacts)); err != nil {
return err
}
resultArgs = append(resultArgs, tpl.String())
}
container.Args = resultArgs
initContainers = append(initContainers, container)
}
out.InitContainers = initContainers
}
return nil
}