func newTaskManagerContainer()

in controllers/flinkcluster/flinkcluster_converter.go [417:448]


func newTaskManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container {
	var imageSpec = flinkCluster.Spec.Image
	var taskManagerSpec = flinkCluster.Spec.TaskManager
	var dataPort = corev1.ContainerPort{Name: "data", ContainerPort: *taskManagerSpec.Ports.Data}
	var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *taskManagerSpec.Ports.RPC}
	var queryPort = corev1.ContainerPort{Name: "query", ContainerPort: *taskManagerSpec.Ports.Query}
	var ports = []corev1.ContainerPort{dataPort, rpcPort, queryPort}
	for _, port := range taskManagerSpec.ExtraPorts {
		ports = append(ports, corev1.ContainerPort{Name: port.Name, ContainerPort: port.ContainerPort, Protocol: corev1.Protocol(port.Protocol)})
	}

	return &corev1.Container{
		Name:            "taskmanager",
		Image:           imageSpec.Name,
		ImagePullPolicy: imageSpec.PullPolicy,
		Args:            []string{"taskmanager"},
		Ports:           ports,
		LivenessProbe:   taskManagerSpec.LivenessProbe,
		ReadinessProbe:  taskManagerSpec.ReadinessProbe,
		Resources:       taskManagerSpec.Resources,
		Env:             flinkCluster.Spec.EnvVars,
		EnvFrom:         flinkCluster.Spec.EnvFrom,
		VolumeMounts:    taskManagerSpec.VolumeMounts,
		Lifecycle: &corev1.Lifecycle{
			PreStop: &corev1.LifecycleHandler{
				Exec: &corev1.ExecAction{
					Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)},
				},
			},
		},
	}
}