func newJobSubmitterPodSpec()

in controllers/flinkcluster/flinkcluster_converter.go [758:873]


func newJobSubmitterPodSpec(flinkCluster *v1beta1.FlinkCluster) *corev1.PodSpec {
	var jobSpec = flinkCluster.Spec.Job
	if jobSpec == nil {
		return nil
	}

	var status = flinkCluster.Status
	var clusterSpec = flinkCluster.Spec
	var imageSpec = clusterSpec.Image
	var serviceAccount = clusterSpec.ServiceAccountName
	var jobManagerSpec = clusterSpec.JobManager
	var clusterName = flinkCluster.Name
	var jobManagerServiceName = getJobManagerServiceName(clusterName)
	var jobManagerAddress = fmt.Sprintf(
		"%s:%d", jobManagerServiceName, *jobManagerSpec.Ports.UI)

	var jobArgs = []string{"bash", submitJobScriptPath}
	jobArgs = append(jobArgs, "--jobmanager", jobManagerAddress)
	if jobSpec.ClassName != nil {
		jobArgs = append(jobArgs, "--class", *jobSpec.ClassName)
	}

	var fromSavepoint = convertFromSavepoint(jobSpec, status.Components.Job, &status.Revision)
	if fromSavepoint != nil {
		jobArgs = append(jobArgs, "--fromSavepoint", *fromSavepoint)
	}

	if jobSpec.AllowNonRestoredState != nil &&
		*jobSpec.AllowNonRestoredState {
		jobArgs = append(jobArgs, "--allowNonRestoredState")
	}

	if parallelism, err := calJobParallelism(flinkCluster); err == nil {
		jobArgs = append(jobArgs, "--parallelism", fmt.Sprint(parallelism))
	}

	if jobSpec.NoLoggingToStdout != nil &&
		*jobSpec.NoLoggingToStdout {
		jobArgs = append(jobArgs, "--sysoutLogging")
	}

	if jobSpec.Mode != nil && *jobSpec.Mode == v1beta1.JobModeDetached {
		jobArgs = append(jobArgs, "--detached")
	}

	if jobSpec.ClassPath != nil && len(jobSpec.ClassPath) > 0 {
		for _, u := range jobSpec.ClassPath {
			jobArgs = append(jobArgs, "-C", u)
		}
	}

	envVars := []corev1.EnvVar{{
		Name:  jobManagerAddrEnvVar,
		Value: jobManagerAddress,
	}}
	envVars = append(envVars, flinkCluster.Spec.EnvVars...)

	var volumes []corev1.Volume
	var volumeMounts []corev1.VolumeMount
	volumes = append(volumes, jobSpec.Volumes...)
	volumeMounts = append(volumeMounts, jobSpec.VolumeMounts...)

	// Submit job script config.
	sbsVolume, sbsMount, confMount := convertSubmitJobScript(clusterName)
	volumes = append(volumes, *sbsVolume)
	volumeMounts = append(volumeMounts, *sbsMount, *confMount)

	if jobSpec.JarFile != nil {
		jobArgs = append(jobArgs, *jobSpec.JarFile)
	}

	if jobSpec.PyFile != nil {
		jobArgs = append(jobArgs, "--python", *jobSpec.PyFile)
	}

	if jobSpec.PyFiles != nil {
		jobArgs = append(jobArgs, "--pyFiles", *jobSpec.PyFiles)
	}

	if jobSpec.PyModule != nil {
		jobArgs = append(jobArgs, "--pyModule", *jobSpec.PyModule)
	}

	jobArgs = append(jobArgs, jobSpec.Args...)

	podSpec := &corev1.PodSpec{
		InitContainers: convertContainers(jobSpec.InitContainers, volumeMounts, envVars),
		Containers: []corev1.Container{
			{
				Name:            "main",
				Image:           imageSpec.Name,
				ImagePullPolicy: imageSpec.PullPolicy,
				Args:            jobArgs,
				Env:             envVars,
				EnvFrom:         flinkCluster.Spec.EnvFrom,
				VolumeMounts:    volumeMounts,
				Resources:       jobSpec.Resources,
			},
		},
		RestartPolicy:      corev1.RestartPolicyNever,
		Volumes:            volumes,
		ImagePullSecrets:   imageSpec.PullSecrets,
		SecurityContext:    jobSpec.SecurityContext,
		HostAliases:        jobSpec.HostAliases,
		ServiceAccountName: getServiceAccountName(serviceAccount),
		Affinity:           jobSpec.Affinity,
		NodeSelector:       jobSpec.NodeSelector,
		Tolerations:        jobSpec.Tolerations,
	}

	setFlinkConfig(getConfigMapName(flinkCluster.Name), podSpec)
	setHadoopConfig(flinkCluster.Spec.HadoopConfig, podSpec)
	setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec)

	return podSpec
}