controllers/flinkcluster/flinkcluster_converter.go (1,194 lines of code) (raw):

/* Copyright 2019 Google LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package flinkcluster import ( "fmt" "math" "regexp" "sort" "strconv" "strings" "k8s.io/apimachinery/pkg/api/resource" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/model" "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "github.com/hashicorp/go-version" ) // Converter which converts the FlinkCluster spec to the desired // underlying Kubernetes resource specs. const ( preStopSleepSeconds = 30 flinkConfigMapPath = "/opt/flink/conf" flinkConfigMapVolume = "flink-config-volume" submitJobScriptPath = "/opt/flink-operator/submit-job.sh" gcpServiceAccountVolume = "gcp-service-account-volume" hadoopConfigVolume = "hadoop-config-volume" jobManagerAddrEnvVar = "FLINK_JM_ADDR" jobJarUriEnvVar = "FLINK_JOB_JAR_URI" jobPyFileUriEnvVar = "FLINK_JOB_PY_FILE_URI" jobPyFilesUriEnvVar = "FLINK_JOB_PY_FILES_URI" hadoopConfDirEnvVar = "HADOOP_CONF_DIR" gacEnvVar = "GOOGLE_APPLICATION_CREDENTIALS" ) var ( backoffLimit int32 = 0 terminationGracePeriodSeconds int64 = 60 flinkSysProps = map[string]struct{}{ "jobmanager.rpc.address": {}, "jobmanager.rpc.port": {}, "blob.server.port": {}, "query.server.port": {}, "rest.port": {}, } v10, _ = version.NewVersion("1.10") ) // Gets the desired state of a cluster. func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredClusterState { state := &model.DesiredClusterState{} cluster := observed.cluster // The cluster has been deleted, all resources should be cleaned up. if cluster == nil { return state } jobSpec := cluster.Spec.Job applicationMode := IsApplicationModeCluster(cluster) if !shouldCleanup(cluster, "ConfigMap") { state.ConfigMap = newConfigMap(cluster) } if !shouldCleanup(cluster, "PodDisruptionBudget") { state.PodDisruptionBudget = newPodDisruptionBudget(cluster) } if !shouldCleanup(cluster, "HorizontalPodAutoscaler") { state.HorizontalPodAutoscaler = newHorizontalPodAutoscaler(cluster) } if !shouldCleanup(cluster, "JobManager") && !applicationMode { state.JmStatefulSet = newJobManagerStatefulSet(cluster) } if !shouldCleanup(cluster, "TaskManager") { switch cluster.Spec.TaskManager.DeploymentType { case v1beta1.DeploymentTypeStatefulSet: state.TmStatefulSet = newTaskManagerStatefulSet(cluster) case v1beta1.DeploymentTypeDeployment: state.TmDeployment = newTaskManagerDeployment(cluster) } } if !shouldCleanup(cluster, "TaskManagerService") { state.TmService = newTaskManagerService(cluster) } if !shouldCleanup(cluster, "JobManagerService") { state.JmService = newJobManagerService(cluster) } if !shouldCleanup(cluster, "JobManagerIngress") { state.JmIngress = newJobManagerIngress(cluster) } if jobSpec != nil { jobStatus := cluster.Status.Components.Job keepJobState := (shouldStopJob(cluster) || jobStatus.IsStopped()) && (!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) && shouldCleanup(cluster, "Job") if !keepJobState { state.Job = newJob(cluster) } } return state } func newJobManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container { var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var jobManagerSpec = clusterSpec.JobManager var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *jobManagerSpec.Ports.RPC} var blobPort = corev1.ContainerPort{Name: "blob", ContainerPort: *jobManagerSpec.Ports.Blob} var queryPort = corev1.ContainerPort{Name: "query", ContainerPort: *jobManagerSpec.Ports.Query} var uiPort = corev1.ContainerPort{Name: "ui", ContainerPort: *jobManagerSpec.Ports.UI} var ports = []corev1.ContainerPort{rpcPort, blobPort, queryPort, uiPort} for _, port := range jobManagerSpec.ExtraPorts { ports = append(ports, corev1.ContainerPort{Name: port.Name, ContainerPort: port.ContainerPort, Protocol: corev1.Protocol(port.Protocol)}) } container := &corev1.Container{ Name: "jobmanager", Image: imageSpec.Name, ImagePullPolicy: imageSpec.PullPolicy, Args: []string{"jobmanager"}, Ports: ports, LivenessProbe: jobManagerSpec.LivenessProbe, ReadinessProbe: jobManagerSpec.ReadinessProbe, Resources: jobManagerSpec.Resources, Env: flinkCluster.Spec.EnvVars, EnvFrom: flinkCluster.Spec.EnvFrom, VolumeMounts: jobManagerSpec.VolumeMounts, Lifecycle: &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ Exec: &corev1.ExecAction{ Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)}, }, }, }, } if IsApplicationModeCluster(flinkCluster) { jobSpec := flinkCluster.Spec.Job status := flinkCluster.Status args := []string{"standalone-job"} if parallelism, err := calJobParallelism(flinkCluster); err == nil { args = append(args, fmt.Sprintf("-Dparallelism.default=%d", parallelism)) } var fromSavepoint = convertFromSavepoint(jobSpec, status.Components.Job, &status.Revision) if fromSavepoint != nil { args = append(args, "--fromSavepoint", *fromSavepoint) } if jobSpec.AllowNonRestoredState != nil && *jobSpec.AllowNonRestoredState { args = append(args, "--allowNonRestoredState") } jobId, _ := GenJobId(flinkCluster) args = append(args, "--job-id", jobId, "--job-classname", *jobSpec.ClassName, ) args = append(args, jobSpec.Args...) container.Args = args } return container } func newJobManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta1.FlinkCluster) *corev1.PodSpec { var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var serviceAccount = clusterSpec.ServiceAccountName var jobManagerSpec = clusterSpec.JobManager var podSpec = &corev1.PodSpec{ InitContainers: convertContainers(jobManagerSpec.InitContainers, []corev1.VolumeMount{}, clusterSpec.EnvVars), Containers: []corev1.Container{*mainContainer}, Volumes: jobManagerSpec.Volumes, Affinity: jobManagerSpec.Affinity, NodeSelector: jobManagerSpec.NodeSelector, Tolerations: jobManagerSpec.Tolerations, ImagePullSecrets: imageSpec.PullSecrets, SecurityContext: jobManagerSpec.SecurityContext, HostAliases: jobManagerSpec.HostAliases, ServiceAccountName: getServiceAccountName(serviceAccount), TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, } setFlinkConfig(getConfigMapName(flinkCluster.Name), podSpec) setHadoopConfig(flinkCluster.Spec.HadoopConfig, podSpec) setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec) podSpec.Containers = append(podSpec.Containers, jobManagerSpec.Sidecars...) return podSpec } // Gets the desired JobManager StatefulSet spec from the FlinkCluster spec. func newJobManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet { var jobManagerSpec = flinkCluster.Spec.JobManager var jobManagerStatefulSetName = getJobManagerName(flinkCluster.Name) var podLabels = getComponentLabels(flinkCluster, "jobmanager") podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels) var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) mainContainer := newJobManagerContainer(flinkCluster) podSpec := newJobManagerPodSpec(mainContainer, flinkCluster) var pvcs []corev1.PersistentVolumeClaim if jobManagerSpec.VolumeClaimTemplates != nil { pvcs = make([]corev1.PersistentVolumeClaim, len(jobManagerSpec.VolumeClaimTemplates)) for i, pvc := range jobManagerSpec.VolumeClaimTemplates { pvc.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)} pvcs[i] = pvc } } return &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: flinkCluster.Namespace, Name: jobManagerStatefulSetName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, Labels: statefulSetLabels, }, Spec: appsv1.StatefulSetSpec{ Replicas: jobManagerSpec.Replicas, Selector: &metav1.LabelSelector{MatchLabels: podLabels}, ServiceName: jobManagerStatefulSetName, VolumeClaimTemplates: pvcs, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabels, Annotations: jobManagerSpec.PodAnnotations, }, Spec: *podSpec, }, }, } } // Gets the desired JobManager service spec from a cluster spec. func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service { var clusterNamespace = flinkCluster.Namespace var clusterName = flinkCluster.Name var jobManagerSpec = flinkCluster.Spec.JobManager var rpcPort = corev1.ServicePort{ Name: "rpc", Port: *jobManagerSpec.Ports.RPC, TargetPort: intstr.FromString("rpc")} var blobPort = corev1.ServicePort{ Name: "blob", Port: *jobManagerSpec.Ports.Blob, TargetPort: intstr.FromString("blob")} var queryPort = corev1.ServicePort{ Name: "query", Port: *jobManagerSpec.Ports.Query, TargetPort: intstr.FromString("query")} var uiPort = corev1.ServicePort{ Name: "ui", Port: *jobManagerSpec.Ports.UI, TargetPort: intstr.FromString("ui")} var jobManagerServiceName = getJobManagerServiceName(clusterName) selectorLabels := getComponentLabels(flinkCluster, "jobmanager") serviceLabels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) serviceLabels = mergeLabels(serviceLabels, jobManagerSpec.ServiceLabels) var serviceAnnotations = jobManagerSpec.ServiceAnnotations var jobManagerService = &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, Name: jobManagerServiceName, OwnerReferences: []metav1.OwnerReference{ ToOwnerReference(flinkCluster)}, Labels: serviceLabels, Annotations: serviceAnnotations, }, Spec: corev1.ServiceSpec{ Selector: selectorLabels, Ports: []corev1.ServicePort{rpcPort, blobPort, queryPort, uiPort}, }, } // This implementation is specific to GKE, see details at // https://cloud.google.com/kubernetes-engine/docs/how-to/exposing-apps // https://cloud.google.com/kubernetes-engine/docs/how-to/internal-load-balancing switch jobManagerSpec.AccessScope { case v1beta1.AccessScopeCluster: jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP case v1beta1.AccessScopeVPC: jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer jobManagerService.Annotations = mergeLabels(serviceAnnotations, map[string]string{ "networking.gke.io/load-balancer-type": "Internal", "networking.gke.io/internal-load-balancer-allow-global-access": "true", }) case v1beta1.AccessScopeExternal: jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer case v1beta1.AccessScopeNodePort: jobManagerService.Spec.Type = corev1.ServiceTypeNodePort case v1beta1.AccessScopeHeadless: // Headless services do not allocate any sort of VIP or LoadBalancer, and merely // collect a set of Pod IPs that are assumed to be independently routable: jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP jobManagerService.Spec.ClusterIP = "None" default: panic(fmt.Sprintf( "Unknown service access cope: %v", jobManagerSpec.AccessScope)) } return jobManagerService } // Gets the desired JobManager ingress spec from a cluster spec. func newJobManagerIngress( flinkCluster *v1beta1.FlinkCluster) *networkingv1.Ingress { var jobManagerIngressSpec = flinkCluster.Spec.JobManager.Ingress if jobManagerIngressSpec == nil { return nil } var clusterNamespace = flinkCluster.Namespace var clusterName = flinkCluster.Name var jobManagerServiceName = getJobManagerServiceName(clusterName) var ingressName = getJobManagerIngressName(clusterName) var ingressAnnotations = jobManagerIngressSpec.Annotations var ingressHost string var ingressTLS []networkingv1.IngressTLS var labels = mergeLabels( getComponentLabels(flinkCluster, "jobmanager"), getRevisionHashLabels(&flinkCluster.Status.Revision)) var pathType = networkingv1.PathTypePrefix if jobManagerIngressSpec.HostFormat != nil { ingressHost = getJobManagerIngressHost(*jobManagerIngressSpec.HostFormat, clusterName) } if jobManagerIngressSpec.UseTLS != nil && *jobManagerIngressSpec.UseTLS { var secretName string var hosts []string if ingressHost != "" { hosts = []string{ingressHost} } if jobManagerIngressSpec.TLSSecretName != nil { secretName = *jobManagerIngressSpec.TLSSecretName } if hosts != nil || secretName != "" { ingressTLS = []networkingv1.IngressTLS{{ Hosts: hosts, SecretName: secretName, }} } } var jobManagerIngress = &networkingv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, Name: ingressName, OwnerReferences: []metav1.OwnerReference{ ToOwnerReference(flinkCluster)}, Labels: labels, Annotations: ingressAnnotations, }, Spec: networkingv1.IngressSpec{ TLS: ingressTLS, Rules: []networkingv1.IngressRule{{ Host: ingressHost, IngressRuleValue: networkingv1.IngressRuleValue{ HTTP: &networkingv1.HTTPIngressRuleValue{ Paths: []networkingv1.HTTPIngressPath{{ Path: "/", PathType: &pathType, Backend: networkingv1.IngressBackend{ Service: &networkingv1.IngressServiceBackend{ Name: jobManagerServiceName, Port: networkingv1.ServiceBackendPort{ Name: "ui", }, }, }, }}, }, }, }}, }, } return jobManagerIngress } func newTaskManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container { var imageSpec = flinkCluster.Spec.Image var taskManagerSpec = flinkCluster.Spec.TaskManager var dataPort = corev1.ContainerPort{Name: "data", ContainerPort: *taskManagerSpec.Ports.Data} var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *taskManagerSpec.Ports.RPC} var queryPort = corev1.ContainerPort{Name: "query", ContainerPort: *taskManagerSpec.Ports.Query} var ports = []corev1.ContainerPort{dataPort, rpcPort, queryPort} for _, port := range taskManagerSpec.ExtraPorts { ports = append(ports, corev1.ContainerPort{Name: port.Name, ContainerPort: port.ContainerPort, Protocol: corev1.Protocol(port.Protocol)}) } return &corev1.Container{ Name: "taskmanager", Image: imageSpec.Name, ImagePullPolicy: imageSpec.PullPolicy, Args: []string{"taskmanager"}, Ports: ports, LivenessProbe: taskManagerSpec.LivenessProbe, ReadinessProbe: taskManagerSpec.ReadinessProbe, Resources: taskManagerSpec.Resources, Env: flinkCluster.Spec.EnvVars, EnvFrom: flinkCluster.Spec.EnvFrom, VolumeMounts: taskManagerSpec.VolumeMounts, Lifecycle: &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ Exec: &corev1.ExecAction{ Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)}, }, }, }, } } func newTaskManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta1.FlinkCluster) *corev1.PodSpec { var clusterSpec = flinkCluster.Spec var imageSpec = flinkCluster.Spec.Image var serviceAccount = clusterSpec.ServiceAccountName var taskManagerSpec = flinkCluster.Spec.TaskManager var podSpec = &corev1.PodSpec{ InitContainers: convertContainers(taskManagerSpec.InitContainers, []corev1.VolumeMount{}, clusterSpec.EnvVars), Containers: []corev1.Container{*mainContainer}, Volumes: taskManagerSpec.Volumes, Affinity: taskManagerSpec.Affinity, NodeSelector: taskManagerSpec.NodeSelector, Tolerations: taskManagerSpec.Tolerations, ImagePullSecrets: imageSpec.PullSecrets, SecurityContext: taskManagerSpec.SecurityContext, HostAliases: taskManagerSpec.HostAliases, ServiceAccountName: getServiceAccountName(serviceAccount), TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, } setFlinkConfig(getConfigMapName(flinkCluster.Name), podSpec) setHadoopConfig(flinkCluster.Spec.HadoopConfig, podSpec) setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec) podSpec.Containers = append(podSpec.Containers, taskManagerSpec.Sidecars...) return podSpec } // Gets the desired TaskManager StatefulSet spec from a cluster spec. func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet { var taskManagerSpec = flinkCluster.Spec.TaskManager var taskManagerStatefulSetName = getTaskManagerName(flinkCluster.Name) var podLabels = getComponentLabels(flinkCluster, "taskmanager") podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels) var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) mainContainer := newTaskManagerContainer(flinkCluster) podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster) var pvcs []corev1.PersistentVolumeClaim if taskManagerSpec.VolumeClaimTemplates != nil { pvcs = make([]corev1.PersistentVolumeClaim, len(taskManagerSpec.VolumeClaimTemplates)) for i, pvc := range taskManagerSpec.VolumeClaimTemplates { pvc.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)} pvcs[i] = pvc } } return &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: flinkCluster.Namespace, Name: taskManagerStatefulSetName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, Labels: statefulSetLabels, }, Spec: appsv1.StatefulSetSpec{ Replicas: taskManagerSpec.Replicas, Selector: &metav1.LabelSelector{MatchLabels: podLabels}, ServiceName: taskManagerStatefulSetName, VolumeClaimTemplates: pvcs, PodManagementPolicy: "Parallel", Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabels, Annotations: taskManagerSpec.PodAnnotations, }, Spec: *podSpec, }, }, } } func getEphemeralVolumesFromTaskManagerSpec(flinkCluster *v1beta1.FlinkCluster, labels map[string]string) []corev1.Volume { var ephemeralVolumes []corev1.Volume var volumeClaimsInSpec = flinkCluster.Spec.TaskManager.VolumeClaimTemplates for _, volume := range volumeClaimsInSpec { ephemeralVolumes = append(ephemeralVolumes, corev1.Volume{ Name: volume.Name, // Ephemeral volume VolumeSource: corev1.VolumeSource{ Ephemeral: &corev1.EphemeralVolumeSource{ VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, Spec: volume.Spec, }, }, }, }) } return ephemeralVolumes } // Gets the desired TaskManager Deployment spec from a cluster spec. func newTaskManagerDeployment(flinkCluster *v1beta1.FlinkCluster) *appsv1.Deployment { var taskManagerSpec = flinkCluster.Spec.TaskManager var taskManagerDeploymentName = getTaskManagerName(flinkCluster.Name) var podLabels = getComponentLabels(flinkCluster, "taskmanager") podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels) var deploymentLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) mainContainer := newTaskManagerContainer(flinkCluster) podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster) podSpec.Volumes = append(podSpec.Volumes, getEphemeralVolumesFromTaskManagerSpec(flinkCluster, podLabels)...) return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: flinkCluster.Namespace, Name: taskManagerDeploymentName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, Labels: deploymentLabels, }, Spec: appsv1.DeploymentSpec{ Replicas: taskManagerSpec.Replicas, Selector: &metav1.LabelSelector{MatchLabels: podLabels}, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabels, Annotations: taskManagerSpec.PodAnnotations, }, Spec: *podSpec, }, }, } } // Gets the desired PodDisruptionBudget. func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDisruptionBudget { pdbSpec := flinkCluster.Spec.PodDisruptionBudget if pdbSpec == nil { return nil } selectorLabels := getClusterLabels(flinkCluster) labels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) if pdbSpec.Selector == nil { pdbSpec.Selector = new(metav1.LabelSelector) } if pdbSpec.Selector.MatchLabels == nil { pdbSpec.Selector.MatchLabels = selectorLabels } else { for k, v := range selectorLabels { pdbSpec.Selector.MatchLabels[k] = v } } return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Namespace: flinkCluster.Namespace, Name: getPodDisruptionBudgetName(flinkCluster.Name), OwnerReferences: []metav1.OwnerReference{ ToOwnerReference(flinkCluster), }, Labels: labels, }, Spec: *pdbSpec, } } // Gets the desired HorizontalPodAutoscaler. func newHorizontalPodAutoscaler(flinkCluster *v1beta1.FlinkCluster) *autoscalingv2.HorizontalPodAutoscaler { hpaSpec := flinkCluster.Spec.TaskManager.HorizontalPodAutoscaler if hpaSpec == nil { return nil } selectorLabels := getClusterLabels(flinkCluster) labels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) return &autoscalingv2.HorizontalPodAutoscaler{ ObjectMeta: metav1.ObjectMeta{ Namespace: flinkCluster.Namespace, Name: getHorizontalPodAutoscalerName(flinkCluster.Name), OwnerReferences: []metav1.OwnerReference{ ToOwnerReference(flinkCluster), }, Labels: labels, }, Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ MaxReplicas: hpaSpec.MaxReplicas, MinReplicas: hpaSpec.MinReplicas, Metrics: hpaSpec.Metrics, Behavior: hpaSpec.Behavior, ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ APIVersion: flinkCluster.APIVersion, Kind: flinkCluster.Kind, Name: flinkCluster.Name, }, }, } } // Gets the desired TaskManager Headless Service. func newTaskManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service { var tmSpec = flinkCluster.Spec.TaskManager if tmSpec == nil { return nil } var clusterNamespace = flinkCluster.Namespace var clusterName = flinkCluster.Name // Service name matches the service name defined in the TM StatefulSet spec var tmSvcName = getTaskManagerName(clusterName) selectorLabels := getComponentLabels(flinkCluster, "taskmanager") serviceLabels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) var tmSvcPorts = []corev1.ServicePort{ { Name: "data", Port: *tmSpec.Ports.Data, }, { Name: "rpc", Port: *tmSpec.Ports.RPC, }, { Name: "query", Port: *tmSpec.Ports.Query, }, } return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, Name: tmSvcName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, Labels: serviceLabels, }, Spec: corev1.ServiceSpec{ Selector: selectorLabels, ClusterIP: corev1.ClusterIPNone, Type: corev1.ServiceTypeClusterIP, Ports: tmSvcPorts, }, } } // Gets the desired configMap. func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap { appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion) var clusterNamespace = flinkCluster.Namespace var clusterName = flinkCluster.Name var flinkProperties = flinkCluster.Spec.FlinkProperties var jmPorts = flinkCluster.Spec.JobManager.Ports var tmPorts = flinkCluster.Spec.TaskManager.Ports var configMapName = getConfigMapName(clusterName) var labels = mergeLabels( getClusterLabels(flinkCluster), getRevisionHashLabels(&flinkCluster.Status.Revision)) // Properties which should be provided from real deployed environment. var flinkProps = map[string]string{ "jobmanager.rpc.address": getJobManagerServiceName(clusterName), "jobmanager.rpc.port": strconv.FormatInt(int64(*jmPorts.RPC), 10), "blob.server.port": strconv.FormatInt(int64(*jmPorts.Blob), 10), "query.server.port": strconv.FormatInt(int64(*jmPorts.Query), 10), "rest.port": strconv.FormatInt(int64(*jmPorts.UI), 10), "taskmanager.rpc.port": strconv.FormatInt(int64(*tmPorts.RPC), 10), } if appVersion == nil || appVersion.LessThan(v10) { var flinkHeapSize = calFlinkHeapSize(flinkCluster) if flinkHeapSize["jobmanager.heap.size"] != "" { flinkProps["jobmanager.heap.size"] = flinkHeapSize["jobmanager.heap.size"] } if flinkHeapSize["taskmanager.heap.size"] != "" { flinkProps["taskmanager.heap.size"] = flinkHeapSize["taskmanager.heap.size"] } } else { var flinkProcessMemorySize = calFlinkMemoryProcessSize(flinkCluster) if flinkProcessMemorySize["jobmanager.memory.process.size"] != "" { flinkProps["jobmanager.memory.process.size"] = flinkProcessMemorySize["jobmanager.memory.process.size"] } if flinkProcessMemorySize["taskmanager.memory.process.size"] != "" { flinkProps["taskmanager.memory.process.size"] = flinkProcessMemorySize["taskmanager.memory.process.size"] } } if taskSlots, err := calTaskManagerTaskSlots(flinkCluster); err == nil { flinkProps["taskmanager.numberOfTaskSlots"] = strconv.Itoa(int(taskSlots)) } // Add custom Flink properties. for k, v := range flinkProperties { // Do not allow to override properties from real deployment. if _, ok := flinkSysProps[k]; ok { continue } flinkProps[k] = v } var configData = getLogConf(flinkCluster.Spec) configData["flink-conf.yaml"] = getFlinkProperties(flinkProps) configData["submit-job.sh"] = submitJobScript var configMap = &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, Name: configMapName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, Labels: labels, }, Data: configData, } return configMap } func newJobSubmitterPodSpec(flinkCluster *v1beta1.FlinkCluster) *corev1.PodSpec { var jobSpec = flinkCluster.Spec.Job if jobSpec == nil { return nil } var status = flinkCluster.Status var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var serviceAccount = clusterSpec.ServiceAccountName var jobManagerSpec = clusterSpec.JobManager var clusterName = flinkCluster.Name var jobManagerServiceName = getJobManagerServiceName(clusterName) var jobManagerAddress = fmt.Sprintf( "%s:%d", jobManagerServiceName, *jobManagerSpec.Ports.UI) var jobArgs = []string{"bash", submitJobScriptPath} jobArgs = append(jobArgs, "--jobmanager", jobManagerAddress) if jobSpec.ClassName != nil { jobArgs = append(jobArgs, "--class", *jobSpec.ClassName) } var fromSavepoint = convertFromSavepoint(jobSpec, status.Components.Job, &status.Revision) if fromSavepoint != nil { jobArgs = append(jobArgs, "--fromSavepoint", *fromSavepoint) } if jobSpec.AllowNonRestoredState != nil && *jobSpec.AllowNonRestoredState { jobArgs = append(jobArgs, "--allowNonRestoredState") } if parallelism, err := calJobParallelism(flinkCluster); err == nil { jobArgs = append(jobArgs, "--parallelism", fmt.Sprint(parallelism)) } if jobSpec.NoLoggingToStdout != nil && *jobSpec.NoLoggingToStdout { jobArgs = append(jobArgs, "--sysoutLogging") } if jobSpec.Mode != nil && *jobSpec.Mode == v1beta1.JobModeDetached { jobArgs = append(jobArgs, "--detached") } if jobSpec.ClassPath != nil && len(jobSpec.ClassPath) > 0 { for _, u := range jobSpec.ClassPath { jobArgs = append(jobArgs, "-C", u) } } envVars := []corev1.EnvVar{{ Name: jobManagerAddrEnvVar, Value: jobManagerAddress, }} envVars = append(envVars, flinkCluster.Spec.EnvVars...) var volumes []corev1.Volume var volumeMounts []corev1.VolumeMount volumes = append(volumes, jobSpec.Volumes...) volumeMounts = append(volumeMounts, jobSpec.VolumeMounts...) // Submit job script config. sbsVolume, sbsMount, confMount := convertSubmitJobScript(clusterName) volumes = append(volumes, *sbsVolume) volumeMounts = append(volumeMounts, *sbsMount, *confMount) if jobSpec.JarFile != nil { jobArgs = append(jobArgs, *jobSpec.JarFile) } if jobSpec.PyFile != nil { jobArgs = append(jobArgs, "--python", *jobSpec.PyFile) } if jobSpec.PyFiles != nil { jobArgs = append(jobArgs, "--pyFiles", *jobSpec.PyFiles) } if jobSpec.PyModule != nil { jobArgs = append(jobArgs, "--pyModule", *jobSpec.PyModule) } jobArgs = append(jobArgs, jobSpec.Args...) podSpec := &corev1.PodSpec{ InitContainers: convertContainers(jobSpec.InitContainers, volumeMounts, envVars), Containers: []corev1.Container{ { Name: "main", Image: imageSpec.Name, ImagePullPolicy: imageSpec.PullPolicy, Args: jobArgs, Env: envVars, EnvFrom: flinkCluster.Spec.EnvFrom, VolumeMounts: volumeMounts, Resources: jobSpec.Resources, }, }, RestartPolicy: corev1.RestartPolicyNever, Volumes: volumes, ImagePullSecrets: imageSpec.PullSecrets, SecurityContext: jobSpec.SecurityContext, HostAliases: jobSpec.HostAliases, ServiceAccountName: getServiceAccountName(serviceAccount), Affinity: jobSpec.Affinity, NodeSelector: jobSpec.NodeSelector, Tolerations: jobSpec.Tolerations, } setFlinkConfig(getConfigMapName(flinkCluster.Name), podSpec) setHadoopConfig(flinkCluster.Spec.HadoopConfig, podSpec) setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec) return podSpec } func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job { jobSpec := flinkCluster.Spec.Job if jobSpec == nil { return nil } recorded := flinkCluster.Status jobManagerSpec := flinkCluster.Spec.JobManager labels := getClusterLabels(flinkCluster) labels = mergeLabels(labels, getRevisionHashLabels(&recorded.Revision)) var jobName string var annotations map[string]string var podSpec *corev1.PodSpec if IsApplicationModeCluster(flinkCluster) { jobId, _ := GenJobId(flinkCluster) labels = mergeLabels(labels, getComponentLabels(flinkCluster, "jobmanager")) labels = mergeLabels(labels, jobManagerSpec.PodLabels) labels = mergeLabels(labels, map[string]string{JobIdLabel: jobId}) jobName = getJobManagerJobName(flinkCluster.Name) annotations = jobManagerSpec.PodAnnotations mainContainer := newJobManagerContainer(flinkCluster) podSpec = newJobManagerPodSpec(mainContainer, flinkCluster) } else { jobName = getSubmitterJobName(flinkCluster.Name) labels = mergeLabels(labels, jobSpec.PodLabels) annotations = jobSpec.PodAnnotations podSpec = newJobSubmitterPodSpec(flinkCluster) } // Disable the retry mechanism of k8s Job, all retries should be initiated // by the operator based on the job restart policy. This is because Flink // jobs are stateful, if a job fails after running for 10 hours, we probably // don't want to start over from the beginning, instead we want to resume // the job from the latest savepoint which means strictly speaking it is no // longer the same job as the previous one because the `--fromSavepoint` // parameter has changed. podSpec.RestartPolicy = corev1.RestartPolicyNever return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Namespace: flinkCluster.Namespace, Name: jobName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, Labels: labels, }, Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, Annotations: annotations, }, Spec: *podSpec, }, BackoffLimit: &backoffLimit, }, } } // Decide from which savepoint Flink job should be restored when the job created, updated or restarted // // case 1) Restore job from the user provided savepoint // When FlinkCluster is created or updated, if spec.job.fromSavepoint is specified, Flink job will be restored from it. // // case 2) Restore Flink job from the latest savepoint. // When FlinkCluster is updated with no spec.job.fromSavepoint, or job is restarted from the failed state, // Flink job will be restored from the latest savepoint created by the operator. // // case 3) When latest created savepoint is unavailable, use the savepoint from which current job was restored. func convertFromSavepoint(jobSpec *v1beta1.JobSpec, jobStatus *v1beta1.JobStatus, revision *v1beta1.RevisionStatus) *string { switch { // Updating with FromSavepoint provided case revision.IsUpdateTriggered() && !util.IsBlank(jobSpec.FromSavepoint): return jobSpec.FromSavepoint // Latest savepoint case jobStatus != nil && jobStatus.SavepointLocation != "": return &jobStatus.SavepointLocation // The savepoint from which current job was restored case jobStatus != nil && jobStatus.FromSavepoint != "": return &jobStatus.FromSavepoint } // Creating for the first time or other situation if !util.IsBlank(jobSpec.FromSavepoint) { return jobSpec.FromSavepoint } return nil } func appendVolumes(volumes []corev1.Volume, newVolumes ...corev1.Volume) []corev1.Volume { for _, mounts := range newVolumes { var conflict = false for _, mount := range volumes { if mounts.Name == mount.Name { conflict = true break } } if !conflict { volumes = append(volumes, mounts) } } return volumes } func appendVolumeMounts(volumeMounts []corev1.VolumeMount, newVolumeMounts ...corev1.VolumeMount) []corev1.VolumeMount { for _, mounts := range newVolumeMounts { var conflict = false for _, mount := range volumeMounts { if mounts.MountPath == mount.MountPath { conflict = true break } } if !conflict { volumeMounts = append(volumeMounts, mounts) } } return volumeMounts } func appendEnvVars(envVars []corev1.EnvVar, newEnvVars ...corev1.EnvVar) []corev1.EnvVar { for _, envVar := range newEnvVars { var conflict = false for _, env := range envVars { if envVar.Name == env.Name { conflict = true break } } if !conflict { envVars = append(envVars, envVar) } } return envVars } // Copy any non-duplicate volume mounts and env vars to each specified container func convertContainer(container corev1.Container, volumeMounts []corev1.VolumeMount, envVars []corev1.EnvVar) corev1.Container { container.VolumeMounts = appendVolumeMounts(container.VolumeMounts, volumeMounts...) container.Env = appendEnvVars(container.Env, envVars...) return container } // Copy any non-duplicate volume mounts and env vars to the specified containers func convertContainers(containers []corev1.Container, volumeMounts []corev1.VolumeMount, envVars []corev1.EnvVar) []corev1.Container { var updatedContainers = []corev1.Container{} for _, container := range containers { updatedContainer := convertContainer(container, volumeMounts, envVars) updatedContainers = append(updatedContainers, updatedContainer) } return updatedContainers } // Converts the FlinkCluster as owner reference for its child resources. func ToOwnerReference( flinkCluster *v1beta1.FlinkCluster) metav1.OwnerReference { return metav1.OwnerReference{ APIVersion: flinkCluster.APIVersion, Kind: flinkCluster.Kind, Name: flinkCluster.Name, UID: flinkCluster.UID, Controller: &[]bool{true}[0], BlockOwnerDeletion: &[]bool{false}[0], } } // Gets Flink properties func getFlinkProperties(properties map[string]string) string { var keys = make([]string, len(properties)) i := 0 for k := range properties { keys[i] = k i = i + 1 } sort.Strings(keys) var builder strings.Builder for _, key := range keys { builder.WriteString(fmt.Sprintf("%s: %s\n", key, properties[key])) } return builder.String() } var jobManagerIngressHostRegex = regexp.MustCompile(`{{\s*[$]clusterName\s*}}`) func getJobManagerIngressHost(ingressHostFormat string, clusterName string) string { // TODO: Validating webhook should verify hostFormat return jobManagerIngressHostRegex.ReplaceAllString(ingressHostFormat, clusterName) } // Checks whether the component should be deleted according to the cleanup // policy. Always return false for session cluster. func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool { var jobStatus = cluster.Status.Components.Job // Session cluster. if jobStatus == nil { return false } if cluster.Status.Revision.IsUpdateTriggered() { return false } var action v1beta1.CleanupAction switch jobStatus.State { case v1beta1.JobStateSucceeded: action = cluster.Spec.Job.CleanupPolicy.AfterJobSucceeds case v1beta1.JobStateFailed, v1beta1.JobStateLost, v1beta1.JobStateDeployFailed: action = cluster.Spec.Job.CleanupPolicy.AfterJobFails case v1beta1.JobStateCancelled: action = cluster.Spec.Job.CleanupPolicy.AfterJobCancelled default: return false } switch action { case v1beta1.CleanupActionDeleteCluster: return true case v1beta1.CleanupActionDeleteTaskManager: return component == "TaskManager" } return false } func calJobParallelism(cluster *v1beta1.FlinkCluster) (int32, error) { if cluster.Spec.Job.Parallelism != nil { return *cluster.Spec.Job.Parallelism, nil } value, err := calTaskManagerTaskSlots(cluster) if err != nil { return 0, err } parallelism := *cluster.Spec.TaskManager.Replicas * value return parallelism, nil } func calTaskManagerTaskSlots(cluster *v1beta1.FlinkCluster) (int32, error) { if ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"]; ok { parsed, err := strconv.ParseInt(ts, 10, 32) if err != nil { return 0, err } return int32(parsed), nil } resources := cluster.Spec.TaskManager.GetResources() slots := int32(resources.Cpu().Value()) / 2 if slots == 0 { return 1, nil } return slots, nil } func calFlinkHeapSize(cluster *v1beta1.FlinkCluster) map[string]string { jm := cluster.Spec.JobManager tm := cluster.Spec.TaskManager if jm.MemoryOffHeapRatio == nil || tm.MemoryOffHeapRatio == nil { return nil } var flinkHeapSize = make(map[string]string) jmHeapSizeMB := calHeapSize( jm.GetResources().Memory().Value(), jm.MemoryOffHeapMin.Value(), int64(*jm.MemoryOffHeapRatio)) if jmHeapSizeMB > 0 { flinkHeapSize["jobmanager.heap.size"] = strconv.FormatInt(jmHeapSizeMB, 10) + "m" } tmHeapSizeMB := calHeapSize( tm.GetResources().Memory().Value(), tm.MemoryOffHeapMin.Value(), int64(*tm.MemoryOffHeapRatio)) if tmHeapSizeMB > 0 { flinkHeapSize["taskmanager.heap.size"] = strconv.FormatInt(tmHeapSizeMB, 10) + "m" } return flinkHeapSize } // Converts memory value to the format of divisor and returns ceiling of the value. func convertResourceMemoryToInt64(memory resource.Quantity, divisor resource.Quantity) int64 { return int64(math.Ceil(float64(memory.Value()) / float64(divisor.Value()))) } // Calculate heap size in MB func calHeapSize(memSize int64, offHeapMin int64, offHeapRatio int64) int64 { var heapSizeMB int64 offHeapSize := int64(math.Ceil(float64(memSize*offHeapRatio) / 100)) if offHeapSize < offHeapMin { offHeapSize = offHeapMin } heapSizeCalculated := memSize - offHeapSize if heapSizeCalculated > 0 { divisor := resource.MustParse("1Mi") heapSizeQuantity := resource.NewQuantity(heapSizeCalculated, resource.DecimalSI) heapSizeMB = convertResourceMemoryToInt64(*heapSizeQuantity, divisor) } return heapSizeMB } func calProcessMemorySize(memSize, ratio int64) int64 { size := int64(math.Ceil(float64((memSize * ratio)) / 100)) divisor := resource.MustParse("1Mi") quantity := resource.NewQuantity(size, resource.DecimalSI) return convertResourceMemoryToInt64(*quantity, divisor) } // Calculate process memory size in MB func calFlinkMemoryProcessSize(cluster *v1beta1.FlinkCluster) map[string]string { var flinkProcessMemory = make(map[string]string) jm := cluster.Spec.JobManager tm := cluster.Spec.TaskManager jmMemoryByte := jm.GetResources().Memory().Value() jmRatio := int64(*jm.MemoryProcessRatio) jmSizeMB := calProcessMemorySize(jmMemoryByte, jmRatio) if jmSizeMB > 0 { flinkProcessMemory["jobmanager.memory.process.size"] = strconv.FormatInt(jmSizeMB, 10) + "m" } tmMemByte := tm.GetResources().Memory().Value() ratio := int64(*tm.MemoryProcessRatio) sizeMB := calProcessMemorySize(tmMemByte, ratio) if sizeMB > 0 { flinkProcessMemory["taskmanager.memory.process.size"] = strconv.FormatInt(sizeMB, 10) + "m" } return flinkProcessMemory } func setFlinkConfig(name string, podSpec *corev1.PodSpec) bool { var envVars []corev1.EnvVar volumes := []corev1.Volume{{ Name: flinkConfigMapVolume, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: name, }, }, }, }} volumeMounts := []corev1.VolumeMount{{ Name: flinkConfigMapVolume, MountPath: flinkConfigMapPath, }} podSpec.Containers = convertContainers(podSpec.Containers, volumeMounts, envVars) podSpec.InitContainers = convertContainers(podSpec.InitContainers, volumeMounts, envVars) podSpec.Volumes = appendVolumes(podSpec.Volumes, volumes...) return true } func convertSubmitJobScript(clusterName string) (*corev1.Volume, *corev1.VolumeMount, *corev1.VolumeMount) { confVol := &corev1.Volume{ Name: flinkConfigMapVolume, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: getConfigMapName(clusterName), }, }, }, } scriptMount := &corev1.VolumeMount{ Name: flinkConfigMapVolume, MountPath: submitJobScriptPath, SubPath: "submit-job.sh", } confMount := &corev1.VolumeMount{ Name: flinkConfigMapVolume, MountPath: flinkConfigMapPath, } return confVol, scriptMount, confMount } func setHadoopConfig(hadoopConfig *v1beta1.HadoopConfig, podSpec *corev1.PodSpec) bool { if hadoopConfig == nil { return false } var volumes = []corev1.Volume{{ Name: hadoopConfigVolume, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: hadoopConfig.ConfigMapName, }, }, }, }} var volumeMounts = []corev1.VolumeMount{{ Name: hadoopConfigVolume, MountPath: hadoopConfig.MountPath, ReadOnly: true, }} var envVars = []corev1.EnvVar{{ Name: hadoopConfDirEnvVar, Value: hadoopConfig.MountPath, }} podSpec.Containers = convertContainers(podSpec.Containers, volumeMounts, envVars) podSpec.InitContainers = convertContainers(podSpec.InitContainers, volumeMounts, envVars) podSpec.Volumes = appendVolumes(podSpec.Volumes, volumes...) return true } func setGCPConfig(gcpConfig *v1beta1.GCPConfig, podSpec *corev1.PodSpec) bool { if gcpConfig == nil { return false } var saConfig = gcpConfig.ServiceAccount var saVolume = corev1.Volume{ Name: gcpServiceAccountVolume, VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ SecretName: gcpConfig.ServiceAccount.SecretName, }, }, } var saMount = corev1.VolumeMount{ Name: gcpServiceAccountVolume, MountPath: gcpConfig.ServiceAccount.MountPath, ReadOnly: true, } if !strings.HasSuffix(saMount.MountPath, "/") { saMount.MountPath = saMount.MountPath + "/" } var saEnv = corev1.EnvVar{ Name: gacEnvVar, Value: saMount.MountPath + saConfig.KeyFile, } volumes := []corev1.Volume{saVolume} volumeMounts := []corev1.VolumeMount{saMount} envVars := []corev1.EnvVar{saEnv} podSpec.Containers = convertContainers(podSpec.Containers, volumeMounts, envVars) podSpec.InitContainers = convertContainers(podSpec.InitContainers, volumeMounts, envVars) podSpec.Volumes = appendVolumes(podSpec.Volumes, volumes...) return true } func getClusterLabels(cluster *v1beta1.FlinkCluster) map[string]string { return map[string]string{ "cluster": cluster.Name, "app": "flink", } } func getServiceAccountName(serviceAccount *string) string { if serviceAccount != nil { return *serviceAccount } return "" } func getComponentLabels(cluster *v1beta1.FlinkCluster, component string) map[string]string { return mergeLabels(getClusterLabels(cluster), map[string]string{ "component": component, }) } func getRevisionHashLabels(r *v1beta1.RevisionStatus) map[string]string { return map[string]string{ RevisionNameLabel: getNextRevisionName(r), } } func mergeLabels(labels1 map[string]string, labels2 map[string]string) map[string]string { var mergedLabels = make(map[string]string) for k, v := range labels1 { mergedLabels[k] = v } for k, v := range labels2 { mergedLabels[k] = v } return mergedLabels } const ( DefaultLog4jConfig = `log4j.rootLogger=INFO, console log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console` DefaultLogbackConfig = `<configuration> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="console"/> </root> <logger name="akka" level="INFO"> <appender-ref ref="console"/> </logger> <logger name="org.apache.kafka" level="INFO"> <appender-ref ref="console"/> </logger> <logger name="org.apache.hadoop" level="INFO"> <appender-ref ref="console"/> </logger> <logger name="org.apache.zookeeper" level="INFO"> <appender-ref ref="console"/> </logger> <logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"> <appender-ref ref="console"/> </logger> </configuration>` ) // TODO: Wouldn't it be better to create a file, put it in an operator image, and read from them?. // Provide logging profiles func getLogConf(spec v1beta1.FlinkClusterSpec) map[string]string { result := spec.LogConfig if result == nil { result = make(map[string]string, 2) } if _, isPresent := result["log4j-console.properties"]; !isPresent { result["log4j-console.properties"] = DefaultLog4jConfig } if _, isPresent := result["log4j-cli.properties"]; !isPresent { result["log4j-cli.properties"] = DefaultLog4jConfig } if _, isPresent := result["logback-console.xml"]; !isPresent { result["logback-console.xml"] = DefaultLogbackConfig } return result }