in controllers/flinkcluster/flinkcluster_converter.go [875:933]
func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job {
jobSpec := flinkCluster.Spec.Job
if jobSpec == nil {
return nil
}
recorded := flinkCluster.Status
jobManagerSpec := flinkCluster.Spec.JobManager
labels := getClusterLabels(flinkCluster)
labels = mergeLabels(labels, getRevisionHashLabels(&recorded.Revision))
var jobName string
var annotations map[string]string
var podSpec *corev1.PodSpec
if IsApplicationModeCluster(flinkCluster) {
jobId, _ := GenJobId(flinkCluster)
labels = mergeLabels(labels, getComponentLabels(flinkCluster, "jobmanager"))
labels = mergeLabels(labels, jobManagerSpec.PodLabels)
labels = mergeLabels(labels, map[string]string{JobIdLabel: jobId})
jobName = getJobManagerJobName(flinkCluster.Name)
annotations = jobManagerSpec.PodAnnotations
mainContainer := newJobManagerContainer(flinkCluster)
podSpec = newJobManagerPodSpec(mainContainer, flinkCluster)
} else {
jobName = getSubmitterJobName(flinkCluster.Name)
labels = mergeLabels(labels, jobSpec.PodLabels)
annotations = jobSpec.PodAnnotations
podSpec = newJobSubmitterPodSpec(flinkCluster)
}
// Disable the retry mechanism of k8s Job, all retries should be initiated
// by the operator based on the job restart policy. This is because Flink
// jobs are stateful, if a job fails after running for 10 hours, we probably
// don't want to start over from the beginning, instead we want to resume
// the job from the latest savepoint which means strictly speaking it is no
// longer the same job as the previous one because the `--fromSavepoint`
// parameter has changed.
podSpec.RestartPolicy = corev1.RestartPolicyNever
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: flinkCluster.Namespace,
Name: jobName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: labels,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: annotations,
},
Spec: *podSpec,
},
BackoffLimit: &backoffLimit,
},
}
}