func()

in pkg/flink/resources.go [231:275]


func (fc *FlinkCluster) updateJobSpec(taskCtx FlinkTaskContext) error {
	if fc.Spec.Job == nil {
		fc.Spec.Job = &flinkOp.JobSpec{}
	}
	out := fc.Spec.Job

	out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations)
	out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels)

	out.ClassName = &taskCtx.Job.MainClass
	out.Args = taskCtx.Job.Args

	if taskCtx.Job.Parallelism != 0 {
		out.Parallelism = &taskCtx.Job.Parallelism
	}

	artifacts := GetJobArtifacts(&taskCtx.Job)

	if out.JarFile == nil && len(artifacts) == 1 {
		out.JarFile = &artifacts[0]
	} else {
		initContainers := []corev1.Container{}
		for _, container := range out.InitContainers {
			resultArgs := []string{}
			for _, arg := range container.Args {
				tmpl, err := containerTmpl.Parse(arg)
				if err != nil {
					return err
				}

				var tpl bytes.Buffer
				if err := tmpl.Execute(&tpl, NewContainerTemplateData(artifacts)); err != nil {
					return err
				}

				resultArgs = append(resultArgs, tpl.String())
			}
			container.Args = resultArgs
			initContainers = append(initContainers, container)
		}
		out.InitContainers = initContainers
	}

	return nil
}