in controllers/flinkcluster/flinkcluster_converter.go [758:873]
func newJobSubmitterPodSpec(flinkCluster *v1beta1.FlinkCluster) *corev1.PodSpec {
var jobSpec = flinkCluster.Spec.Job
if jobSpec == nil {
return nil
}
var status = flinkCluster.Status
var clusterSpec = flinkCluster.Spec
var imageSpec = clusterSpec.Image
var serviceAccount = clusterSpec.ServiceAccountName
var jobManagerSpec = clusterSpec.JobManager
var clusterName = flinkCluster.Name
var jobManagerServiceName = getJobManagerServiceName(clusterName)
var jobManagerAddress = fmt.Sprintf(
"%s:%d", jobManagerServiceName, *jobManagerSpec.Ports.UI)
var jobArgs = []string{"bash", submitJobScriptPath}
jobArgs = append(jobArgs, "--jobmanager", jobManagerAddress)
if jobSpec.ClassName != nil {
jobArgs = append(jobArgs, "--class", *jobSpec.ClassName)
}
var fromSavepoint = convertFromSavepoint(jobSpec, status.Components.Job, &status.Revision)
if fromSavepoint != nil {
jobArgs = append(jobArgs, "--fromSavepoint", *fromSavepoint)
}
if jobSpec.AllowNonRestoredState != nil &&
*jobSpec.AllowNonRestoredState {
jobArgs = append(jobArgs, "--allowNonRestoredState")
}
if parallelism, err := calJobParallelism(flinkCluster); err == nil {
jobArgs = append(jobArgs, "--parallelism", fmt.Sprint(parallelism))
}
if jobSpec.NoLoggingToStdout != nil &&
*jobSpec.NoLoggingToStdout {
jobArgs = append(jobArgs, "--sysoutLogging")
}
if jobSpec.Mode != nil && *jobSpec.Mode == v1beta1.JobModeDetached {
jobArgs = append(jobArgs, "--detached")
}
if jobSpec.ClassPath != nil && len(jobSpec.ClassPath) > 0 {
for _, u := range jobSpec.ClassPath {
jobArgs = append(jobArgs, "-C", u)
}
}
envVars := []corev1.EnvVar{{
Name: jobManagerAddrEnvVar,
Value: jobManagerAddress,
}}
envVars = append(envVars, flinkCluster.Spec.EnvVars...)
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
volumes = append(volumes, jobSpec.Volumes...)
volumeMounts = append(volumeMounts, jobSpec.VolumeMounts...)
// Submit job script config.
sbsVolume, sbsMount, confMount := convertSubmitJobScript(clusterName)
volumes = append(volumes, *sbsVolume)
volumeMounts = append(volumeMounts, *sbsMount, *confMount)
if jobSpec.JarFile != nil {
jobArgs = append(jobArgs, *jobSpec.JarFile)
}
if jobSpec.PyFile != nil {
jobArgs = append(jobArgs, "--python", *jobSpec.PyFile)
}
if jobSpec.PyFiles != nil {
jobArgs = append(jobArgs, "--pyFiles", *jobSpec.PyFiles)
}
if jobSpec.PyModule != nil {
jobArgs = append(jobArgs, "--pyModule", *jobSpec.PyModule)
}
jobArgs = append(jobArgs, jobSpec.Args...)
podSpec := &corev1.PodSpec{
InitContainers: convertContainers(jobSpec.InitContainers, volumeMounts, envVars),
Containers: []corev1.Container{
{
Name: "main",
Image: imageSpec.Name,
ImagePullPolicy: imageSpec.PullPolicy,
Args: jobArgs,
Env: envVars,
EnvFrom: flinkCluster.Spec.EnvFrom,
VolumeMounts: volumeMounts,
Resources: jobSpec.Resources,
},
},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: volumes,
ImagePullSecrets: imageSpec.PullSecrets,
SecurityContext: jobSpec.SecurityContext,
HostAliases: jobSpec.HostAliases,
ServiceAccountName: getServiceAccountName(serviceAccount),
Affinity: jobSpec.Affinity,
NodeSelector: jobSpec.NodeSelector,
Tolerations: jobSpec.Tolerations,
}
setFlinkConfig(getConfigMapName(flinkCluster.Name), podSpec)
setHadoopConfig(flinkCluster.Spec.HadoopConfig, podSpec)
setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec)
return podSpec
}