func newJobManagerContainer()

in controllers/flinkcluster/flinkcluster_converter.go [139:201]


func newJobManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container {
	var clusterSpec = flinkCluster.Spec
	var imageSpec = clusterSpec.Image
	var jobManagerSpec = clusterSpec.JobManager
	var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *jobManagerSpec.Ports.RPC}
	var blobPort = corev1.ContainerPort{Name: "blob", ContainerPort: *jobManagerSpec.Ports.Blob}
	var queryPort = corev1.ContainerPort{Name: "query", ContainerPort: *jobManagerSpec.Ports.Query}
	var uiPort = corev1.ContainerPort{Name: "ui", ContainerPort: *jobManagerSpec.Ports.UI}
	var ports = []corev1.ContainerPort{rpcPort, blobPort, queryPort, uiPort}
	for _, port := range jobManagerSpec.ExtraPorts {
		ports = append(ports, corev1.ContainerPort{Name: port.Name, ContainerPort: port.ContainerPort, Protocol: corev1.Protocol(port.Protocol)})
	}

	container := &corev1.Container{
		Name:            "jobmanager",
		Image:           imageSpec.Name,
		ImagePullPolicy: imageSpec.PullPolicy,
		Args:            []string{"jobmanager"},
		Ports:           ports,
		LivenessProbe:   jobManagerSpec.LivenessProbe,
		ReadinessProbe:  jobManagerSpec.ReadinessProbe,
		Resources:       jobManagerSpec.Resources,
		Env:             flinkCluster.Spec.EnvVars,
		EnvFrom:         flinkCluster.Spec.EnvFrom,
		VolumeMounts:    jobManagerSpec.VolumeMounts,
		Lifecycle: &corev1.Lifecycle{
			PreStop: &corev1.LifecycleHandler{
				Exec: &corev1.ExecAction{
					Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)},
				},
			},
		},
	}

	if IsApplicationModeCluster(flinkCluster) {
		jobSpec := flinkCluster.Spec.Job
		status := flinkCluster.Status
		args := []string{"standalone-job"}
		if parallelism, err := calJobParallelism(flinkCluster); err == nil {
			args = append(args, fmt.Sprintf("-Dparallelism.default=%d", parallelism))
		}

		var fromSavepoint = convertFromSavepoint(jobSpec, status.Components.Job, &status.Revision)
		if fromSavepoint != nil {
			args = append(args, "--fromSavepoint", *fromSavepoint)
		}

		if jobSpec.AllowNonRestoredState != nil && *jobSpec.AllowNonRestoredState {
			args = append(args, "--allowNonRestoredState")
		}

		jobId, _ := GenJobId(flinkCluster)
		args = append(args,
			"--job-id", jobId,
			"--job-classname", *jobSpec.ClassName,
		)

		args = append(args, jobSpec.Args...)
		container.Args = args
	}

	return container
}