func newJob()

in controllers/flinkcluster/flinkcluster_converter.go [875:933]


func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job {
	jobSpec := flinkCluster.Spec.Job
	if jobSpec == nil {
		return nil
	}

	recorded := flinkCluster.Status
	jobManagerSpec := flinkCluster.Spec.JobManager
	labels := getClusterLabels(flinkCluster)
	labels = mergeLabels(labels, getRevisionHashLabels(&recorded.Revision))

	var jobName string
	var annotations map[string]string
	var podSpec *corev1.PodSpec

	if IsApplicationModeCluster(flinkCluster) {
		jobId, _ := GenJobId(flinkCluster)
		labels = mergeLabels(labels, getComponentLabels(flinkCluster, "jobmanager"))
		labels = mergeLabels(labels, jobManagerSpec.PodLabels)
		labels = mergeLabels(labels, map[string]string{JobIdLabel: jobId})
		jobName = getJobManagerJobName(flinkCluster.Name)
		annotations = jobManagerSpec.PodAnnotations
		mainContainer := newJobManagerContainer(flinkCluster)
		podSpec = newJobManagerPodSpec(mainContainer, flinkCluster)
	} else {
		jobName = getSubmitterJobName(flinkCluster.Name)
		labels = mergeLabels(labels, jobSpec.PodLabels)
		annotations = jobSpec.PodAnnotations
		podSpec = newJobSubmitterPodSpec(flinkCluster)
	}

	// Disable the retry mechanism of k8s Job, all retries should be initiated
	// by the operator based on the job restart policy. This is because Flink
	// jobs are stateful, if a job fails after running for 10 hours, we probably
	// don't want to start over from the beginning, instead we want to resume
	// the job from the latest savepoint which means strictly speaking it is no
	// longer the same job as the previous one because the `--fromSavepoint`
	// parameter has changed.
	podSpec.RestartPolicy = corev1.RestartPolicyNever

	return &batchv1.Job{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:       flinkCluster.Namespace,
			Name:            jobName,
			OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
			Labels:          labels,
		},
		Spec: batchv1.JobSpec{
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels:      labels,
					Annotations: annotations,
				},
				Spec: *podSpec,
			},
			BackoffLimit: &backoffLimit,
		},
	}
}