pkg/flink/resources.go (284 lines of code) (raw):

// Copyright 2021 Spotify AB. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package flink import ( "bytes" "net/url" "strings" "text/template" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "sigs.k8s.io/controller-runtime/pkg/client" corev1 "k8s.io/api/core/v1" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils" flinkOp "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" flinkIdl "github.com/spotify/flyte-flink-plugin/gen/pb-go/flyteidl-flink" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var ( containerTmpl = template.New("container-template").Funcs(template.FuncMap{"join": strings.Join}) flinkPropertiesTmpl = template.New("flink-properties-template").Funcs(template.FuncMap{"join": strings.Join}) stagedJarsEnvVarName = "STAGED_JARS" ) type ContainerTemplateData struct { ArtifactsByScheme map[string][]string Artifacts []string } func NewContainerTemplateData(artifacts []string) *ContainerTemplateData { return &ContainerTemplateData{ ArtifactsByScheme: GroupByScheme(artifacts), Artifacts: artifacts, } } type FlinkPropertiesTemplateData struct { Namespace string ClusterName ClusterName Labels map[string]string } type ObjectForPatch struct { Metadata ObjectMetaForPatch `json:"metadata"` } type ObjectMetaForPatch struct { Annotations map[string]interface{} `json:"annotations"` } func NewFlinkPropertiesTemplateData(namespace string, clusterName ClusterName, labels map[string]string) *FlinkPropertiesTemplateData { return &FlinkPropertiesTemplateData{ Namespace: namespace, ClusterName: clusterName, Labels: labels, } } func GroupByScheme(artifacts []string) map[string][]string { groupBy := make(map[string][]string) for _, artifact := range artifacts { url, _ := url.Parse(artifact) groupBy[url.Scheme] = append(groupBy[url.Scheme], artifact) } return groupBy } type FlinkCluster flinkOp.FlinkCluster func getPersistentVolumeClaim(name string, pv *flinkIdl.Resource_PersistentVolume) corev1.PersistentVolumeClaim { storageClass := strings.ReplaceAll(strings.ToLower(pv.GetType().String()), "_", "-") if pv.GetSize() == nil { return corev1.PersistentVolumeClaim{} } storageSize := resource.MustParse(pv.GetSize().GetString_()) return corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: corev1.PersistentVolumeClaimSpec{ AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: storageSize, }, }, StorageClassName: &storageClass, }, } } func addPersistentVolumeClaim( claims []corev1.PersistentVolumeClaim, volumeMounts []corev1.VolumeMount, claim corev1.PersistentVolumeClaim, mountPath string) ([]corev1.PersistentVolumeClaim, []corev1.VolumeMount) { claimsByName := make(map[string]corev1.PersistentVolumeClaim) for _, c := range claims { claimsByName[c.Name] = c } mounts := []corev1.VolumeMount{} for _, volumeMount := range volumeMounts { if volumeMount.MountPath != mountPath { mounts = append(mounts, volumeMount) } else { delete(claimsByName, volumeMount.Name) } } mounts = append(mounts, corev1.VolumeMount{ Name: claim.Name, ReadOnly: false, MountPath: volumeClaimMountPath, }) templates := []corev1.PersistentVolumeClaim{claim} for _, c := range claimsByName { templates = append(templates, c) } return templates, mounts } func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) { out := fc.Spec.JobManager if out == nil { out = &flinkOp.JobManagerSpec{} } out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations) out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels) jm := taskCtx.Job.JobManager if cpu := jm.GetResource().GetCpu(); cpu != nil { if quantity := resource.MustParse(cpu.GetString_()); !quantity.IsZero() { out.Resources.Requests[corev1.ResourceCPU] = quantity } } if memory := jm.GetResource().GetMemory(); memory != nil { if quantity := resource.MustParse(memory.GetString_()); !quantity.IsZero() { out.Resources.Limits[corev1.ResourceMemory] = quantity } } if pv := jm.GetResource().GetPersistentVolume(); pv != nil { claim := getPersistentVolumeClaim(jobManagerVolumeClaim, pv) out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim( out.VolumeClaimTemplates, out.VolumeMounts, claim, volumeClaimMountPath, ) fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath } } func (fc *FlinkCluster) updateTaskManagerSpec(taskCtx FlinkTaskContext) { out := fc.Spec.TaskManager if out == nil { out = &flinkOp.TaskManagerSpec{} } out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations) out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels) tm := taskCtx.Job.TaskManager if cpu := tm.GetResource().GetCpu(); cpu != nil { if quantity := resource.MustParse(cpu.GetString_()); !quantity.IsZero() { out.Resources.Requests[corev1.ResourceCPU] = quantity } } if memory := tm.GetResource().GetMemory(); memory != nil { if quantity := resource.MustParse(memory.GetString_()); !quantity.IsZero() { out.Resources.Limits[corev1.ResourceMemory] = quantity } } if replicas := tm.GetReplicas(); replicas > 0 { out.Replicas = &replicas } if pv := tm.GetResource().GetPersistentVolume(); pv != nil { claim := getPersistentVolumeClaim(taskManagerVolumeClaim, pv) out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim( out.VolumeClaimTemplates, out.VolumeMounts, claim, volumeClaimMountPath, ) fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath } } func GetJobArtifacts(job *flinkIdl.FlinkJob) []string { artifacts := job.GetJarFiles() if len(artifacts) == 0 { // use jflyte artifacts as fallback only urls := make([]string, len(job.GetJflyte().GetArtifacts())) for i, a := range job.GetJflyte().GetArtifacts() { urls[i] = a.Location } artifacts = urls } return artifacts } func (fc *FlinkCluster) updateJobSpec(taskCtx FlinkTaskContext) error { if fc.Spec.Job == nil { fc.Spec.Job = &flinkOp.JobSpec{} } out := fc.Spec.Job out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations) out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels) out.ClassName = &taskCtx.Job.MainClass out.Args = taskCtx.Job.Args if taskCtx.Job.Parallelism != 0 { out.Parallelism = &taskCtx.Job.Parallelism } artifacts := GetJobArtifacts(&taskCtx.Job) if out.JarFile == nil && len(artifacts) == 1 { out.JarFile = &artifacts[0] } else { initContainers := []corev1.Container{} for _, container := range out.InitContainers { resultArgs := []string{} for _, arg := range container.Args { tmpl, err := containerTmpl.Parse(arg) if err != nil { return err } var tpl bytes.Buffer if err := tmpl.Execute(&tpl, NewContainerTemplateData(artifacts)); err != nil { return err } resultArgs = append(resultArgs, tpl.String()) } container.Args = resultArgs initContainers = append(initContainers, container) } out.InitContainers = initContainers } return nil } func (fc *FlinkCluster) updateFlinkProperties(config *Config, taskCtx FlinkTaskContext) error { props := MergeProperties( fc.Spec.FlinkProperties, taskCtx.Job.FlinkProperties, config.FlinkPropertiesOverride, ) result := make(map[string]string) for k, v := range props { tmpl, err := flinkPropertiesTmpl.Parse(v) if err != nil { return err } var tpl bytes.Buffer ft := NewFlinkPropertiesTemplateData(taskCtx.Namespace, taskCtx.ClusterName, taskCtx.Labels) if err := tmpl.Execute(&tpl, ft); err != nil { return err } result[k] = tpl.String() } fc.Spec.FlinkProperties = result return nil } func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCluster, error) { cluster := FlinkCluster(*config.DefaultFlinkCluster.DeepCopy()) cluster.ObjectMeta = metav1.ObjectMeta{ Name: taskCtx.ClusterName.String(), Namespace: taskCtx.Namespace, Annotations: taskCtx.Annotations, Labels: taskCtx.Labels, } cluster.TypeMeta = metav1.TypeMeta{ Kind: KindFlinkCluster, APIVersion: flinkOp.GroupVersion.String(), } cluster.Spec.EnvVars = append(cluster.Spec.EnvVars, corev1.EnvVar{ Name: stagedJarsEnvVarName, Value: strings.Join(GetJobArtifacts(&taskCtx.Job), " "), }) cluster.updateFlinkProperties(config, taskCtx) if version := taskCtx.Job.GetFlinkVersion(); len(version) != 0 { cluster.Spec.FlinkVersion = version } if image := taskCtx.Job.GetImage(); len(image) != 0 { cluster.Spec.Image.Name = image } if sa := taskCtx.Job.GetServiceAccount(); len(sa) != 0 { cluster.Spec.ServiceAccountName = &sa } cluster.updateJobManagerSpec(taskCtx) cluster.updateTaskManagerSpec(taskCtx) if err := cluster.updateJobSpec(taskCtx); err != nil { return nil, err } // fill in defaults resource := flinkOp.FlinkCluster(cluster) return &resource, nil } func NewAnnotationPatch(key string, value string) (client.Patch, error) { annotationPatch := ObjectForPatch{ Metadata: ObjectMetaForPatch{ Annotations: map[string]interface{}{ key: value, }, }, } patchBytes, err := json.Marshal(&annotationPatch) if err != nil { return nil, err } return client.RawPatch(types.MergePatchType, patchBytes), nil }