pkg/flink/types.go (59 lines of code) (raw):
// Copyright 2021 Spotify AB.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flink
import (
"strconv"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
)
type Properties map[string]string
func MergeProperties(maps ...Properties) Properties {
// Start with default config values.
props := make(Properties)
for _, m := range maps {
for k, v := range m {
props[k] = v
}
}
return props
}
func (p Properties) GetInt(key string) (int, error) {
value, err := strconv.Atoi(p[key])
if err != nil {
return 0, err
}
return value, nil
}
type Annotations map[string]string
func GetDefaultAnnotations(taskCtx pluginsCore.TaskExecutionMetadata) Annotations {
return utils.UnionMaps(
config.GetK8sPluginConfig().DefaultAnnotations,
utils.CopyMap(taskCtx.GetAnnotations()),
)
}
type Labels map[string]string
func GetDefaultLabels(taskCtx pluginsCore.TaskExecutionMetadata) Labels {
return utils.UnionMaps(
config.GetK8sPluginConfig().DefaultLabels,
utils.CopyMap(taskCtx.GetLabels()),
)
}
type EnvVars map[string]string
func GetDefaultEnvironmentVariables(taskCtx pluginsCore.TaskExecutionMetadata) EnvVars {
return utils.UnionMaps(
config.GetK8sPluginConfig().DefaultEnvVars,
utils.CopyMap(taskCtx.GetEnvironmentVariables()),
)
}
type ClusterName string
func (cn ClusterName) Validate() error {
return ValidateRegEx(string(cn), regexpFlinkClusterName)
}
func (cn ClusterName) String() string {
return string(cn)
}
func NewClusterName(name string) (ClusterName, error) {
cn := ClusterName(name)
if err := cn.Validate(); err != nil {
return ClusterName(""), nil
}
return cn, nil
}