in controllers/flinkcluster/flinkcluster_converter.go [139:201]
func newJobManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container {
var clusterSpec = flinkCluster.Spec
var imageSpec = clusterSpec.Image
var jobManagerSpec = clusterSpec.JobManager
var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *jobManagerSpec.Ports.RPC}
var blobPort = corev1.ContainerPort{Name: "blob", ContainerPort: *jobManagerSpec.Ports.Blob}
var queryPort = corev1.ContainerPort{Name: "query", ContainerPort: *jobManagerSpec.Ports.Query}
var uiPort = corev1.ContainerPort{Name: "ui", ContainerPort: *jobManagerSpec.Ports.UI}
var ports = []corev1.ContainerPort{rpcPort, blobPort, queryPort, uiPort}
for _, port := range jobManagerSpec.ExtraPorts {
ports = append(ports, corev1.ContainerPort{Name: port.Name, ContainerPort: port.ContainerPort, Protocol: corev1.Protocol(port.Protocol)})
}
container := &corev1.Container{
Name: "jobmanager",
Image: imageSpec.Name,
ImagePullPolicy: imageSpec.PullPolicy,
Args: []string{"jobmanager"},
Ports: ports,
LivenessProbe: jobManagerSpec.LivenessProbe,
ReadinessProbe: jobManagerSpec.ReadinessProbe,
Resources: jobManagerSpec.Resources,
Env: flinkCluster.Spec.EnvVars,
EnvFrom: flinkCluster.Spec.EnvFrom,
VolumeMounts: jobManagerSpec.VolumeMounts,
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)},
},
},
},
}
if IsApplicationModeCluster(flinkCluster) {
jobSpec := flinkCluster.Spec.Job
status := flinkCluster.Status
args := []string{"standalone-job"}
if parallelism, err := calJobParallelism(flinkCluster); err == nil {
args = append(args, fmt.Sprintf("-Dparallelism.default=%d", parallelism))
}
var fromSavepoint = convertFromSavepoint(jobSpec, status.Components.Job, &status.Revision)
if fromSavepoint != nil {
args = append(args, "--fromSavepoint", *fromSavepoint)
}
if jobSpec.AllowNonRestoredState != nil && *jobSpec.AllowNonRestoredState {
args = append(args, "--allowNonRestoredState")
}
jobId, _ := GenJobId(flinkCluster)
args = append(args,
"--job-id", jobId,
"--job-classname", *jobSpec.ClassName,
)
args = append(args, jobSpec.Args...)
container.Args = args
}
return container
}