in controllers/flinkcluster/flinkcluster_converter.go [417:448]
func newTaskManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container {
var imageSpec = flinkCluster.Spec.Image
var taskManagerSpec = flinkCluster.Spec.TaskManager
var dataPort = corev1.ContainerPort{Name: "data", ContainerPort: *taskManagerSpec.Ports.Data}
var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *taskManagerSpec.Ports.RPC}
var queryPort = corev1.ContainerPort{Name: "query", ContainerPort: *taskManagerSpec.Ports.Query}
var ports = []corev1.ContainerPort{dataPort, rpcPort, queryPort}
for _, port := range taskManagerSpec.ExtraPorts {
ports = append(ports, corev1.ContainerPort{Name: port.Name, ContainerPort: port.ContainerPort, Protocol: corev1.Protocol(port.Protocol)})
}
return &corev1.Container{
Name: "taskmanager",
Image: imageSpec.Name,
ImagePullPolicy: imageSpec.PullPolicy,
Args: []string{"taskmanager"},
Ports: ports,
LivenessProbe: taskManagerSpec.LivenessProbe,
ReadinessProbe: taskManagerSpec.ReadinessProbe,
Resources: taskManagerSpec.Resources,
Env: flinkCluster.Spec.EnvVars,
EnvFrom: flinkCluster.Spec.EnvFrom,
VolumeMounts: taskManagerSpec.VolumeMounts,
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)},
},
},
},
}
}