pkg/flink/handler.go (237 lines of code) (raw):
// Copyright 2021 Spotify AB.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flink
import (
"context"
"fmt"
"time"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
flinkOp "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
flinkIdl "github.com/spotify/flyte-flink-plugin/gen/pb-go/flyteidl-flink"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyte/flytestdlib/logger"
structpb "github.com/golang/protobuf/ptypes/struct"
)
type FlinkTaskContext struct {
ClusterName ClusterName
Namespace string
Annotations map[string]string
Labels map[string]string
EnvironmentVariables map[string]string
Job flinkIdl.FlinkJob
}
type FlinkTaskExecContext interface {
TaskReader() pluginsCore.TaskReader
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata
InputReader() io.InputReader
}
func NewFlinkTaskContext(ctx context.Context, taskCtx FlinkTaskExecContext) (*FlinkTaskContext, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
} else if taskTemplate == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "nil task specification")
}
job := flinkIdl.FlinkJob{}
err = utils.UnmarshalStruct(taskTemplate.GetCustom(), &job)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid TaskSpecification [%v], failed to unmarshal", taskTemplate.GetCustom())
}
err = Validate(&job)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid FlinkJob [%v], failed to unmarshal", job)
}
taskInput, err := taskCtx.InputReader().Get(ctx)
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to fetch task inputs [%v]", err.Error())
}
// add task input literals to flink job args
inputs := taskInput.GetLiterals()
args, err := literalMapToFlinkJobArgs(inputs)
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "not support input arg type [%v]", err.Error())
}
job.Args = append(job.Args, args...)
taskMetadata := taskCtx.TaskExecutionMetadata()
cn, err := NewClusterName(taskMetadata.GetTaskExecutionID().GetGeneratedName())
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "invalid cluster name [%v]", err.Error())
}
return &FlinkTaskContext{
ClusterName: cn,
Namespace: taskMetadata.GetNamespace(),
Annotations: GetDefaultAnnotations(taskMetadata),
Labels: GetDefaultLabels(taskMetadata),
EnvironmentVariables: GetDefaultEnvironmentVariables(taskMetadata),
Job: job,
}, nil
}
type flinkResourceHandler struct{}
func (flinkResourceHandler) GetProperties() k8s.PluginProperties {
config := GetFlinkConfig()
props := k8s.PluginProperties{
GeneratedNameMaxLength: config.GeneratedNameMaxLength,
DisableDeleteResourceOnFinalize: true,
}
if config.RemoteClusterConfig.Enabled {
props.DisableInjectFinalizer = true
props.DisableInjectOwnerReferences = true
}
return props
}
// Creates a new Job that will execute the main container as well as any generated types the result from the execution.
func (flinkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
// Start with default config values.
config := GetFlinkConfig()
flinkTaskCtx, err := NewFlinkTaskContext(ctx, taskCtx)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid Flink task context")
}
cluster, err := NewFlinkCluster(config, *flinkTaskCtx)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid Flink cluster")
}
return cluster, nil
}
func (flinkResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionMetadata) (client.Object, error) {
return &flinkOp.FlinkCluster{
TypeMeta: metav1.TypeMeta{
Kind: KindFlinkCluster,
APIVersion: flinkOp.GroupVersion.String(),
},
}, nil
}
func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, resource client.Object) (behavior k8s.AbortBehavior, err error) {
var abortBehavior k8s.AbortBehavior
annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel)
if err != nil {
logger.Error(ctx, "error observed in abort", err)
return abortBehavior, err
}
patchOp := k8s.PatchResourceOperation{Patch: annotationPatch}
abortBehavior = k8s.AbortBehaviorPatchDefaultResource(patchOp, false)
return abortBehavior, nil
}
type FlinkTaskLogsInput struct {
ClusterName string
Namespace string
}
func FlinkClusterTaskLogs(ctx context.Context, config *Config, fi FlinkTaskLogsInput) ([]*core.TaskLog, error) {
var taskLogs []*core.TaskLog
p, err := logs.InitializeLogPlugins(&config.LogConfig)
if err != nil {
return nil, err
}
if p == nil {
return taskLogs, nil
}
jobLog, err := p.GetTaskLogs(tasklog.Input{
PodName: fi.ClusterName,
Namespace: fi.Namespace,
LogName: "(Job)",
})
if err != nil {
return nil, err
}
taskLogs = append(taskLogs, jobLog.TaskLogs...)
return taskLogs, nil
}
func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) (*pluginsCore.TaskInfo, error) {
var taskLogs []*core.TaskLog
tl, err := FlinkClusterTaskLogs(ctx, GetFlinkConfig(), FlinkTaskLogsInput{
ClusterName: flinkCluster.Name,
Namespace: flinkCluster.Namespace,
})
if err != nil {
return nil, err
}
taskLogs = append(taskLogs, tl...)
info := flinkIdl.FlinkExecutionInfo{}
components := flinkCluster.Status.Components
if jmi := components.JobManagerIngress; jmi != nil {
info.JobManager = &flinkIdl.JobManagerExecutionInfo{
IngressURLs: jmi.URLs,
}
}
if job := components.Job; job != nil {
info.Job = &flinkIdl.JobExecutionInfo{Id: job.ID}
}
customInfo := &structpb.Struct{}
err = utils.MarshalStruct(&info, customInfo)
if err != nil {
return nil, err
}
return &pluginsCore.TaskInfo{
Logs: taskLogs,
CustomInfo: customInfo,
}, nil
}
func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool {
config := GetFlinkConfig()
for _, ec := range config.NonRetryableExitCodes {
if exitCode == ec {
logger.Infof(ctx, "Found non-retryable exit code: %v", ec)
return false
}
}
return true
}
func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, occurredAt time.Time, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo {
logger.Infof(ctx, "job_state: %s", jobStatus.State)
msg := fmt.Sprintf("%s %s", jobStatus.ID, jobStatus.State)
switch jobStatus.State {
case flinkOp.JobStateCancelled:
return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
case flinkOp.JobStateFailed, flinkOp.JobStateDeployFailed, flinkOp.JobStateLost:
if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) {
reason := fmt.Sprintf("Flink Job Failed with Error: %v (retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
}
reason := fmt.Sprintf("Flink Job Failed with Error: %v (non-retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
case flinkOp.JobStateRunning:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
case flinkOp.JobStateUpdating, flinkOp.JobStatePending, flinkOp.JobStateDeploying, flinkOp.JobStateRestarting:
return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, msg, info)
case flinkOp.JobStateSucceeded:
if jobStatus.SubmitterExitCode < 0 {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
}
if jobStatus.SubmitterExitCode == 0 {
return pluginsCore.PhaseInfoSuccess(info)
}
if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) {
reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
}
reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
default:
msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State)
return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
}
}
func flinkClusterPhaseInfo(ctx context.Context, app *flinkOp.FlinkCluster, occurredAt time.Time) (pluginsCore.PhaseInfo, error) {
info, err := flinkClusterTaskInfo(ctx, app)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
jobStatus := app.Status.Components.Job
logger.Infof(ctx, "cluster_state: %s", app.Status.State)
switch app.Status.State {
case flinkOp.ClusterStateCreating, flinkOp.ClusterStateReconciling, flinkOp.ClusterStateUpdating:
return pluginsCore.PhaseInfoWaitingForResourcesInfo(occurredAt, pluginsCore.DefaultPhaseVersion, "cluster starting", info), nil
case flinkOp.ClusterStateRunning:
return flinkClusterJobPhaseInfo(ctx, jobStatus, occurredAt, info), nil
case flinkOp.ClusterStateStopped, flinkOp.ClusterStateStopping, flinkOp.ClusterStatePartiallyStopped:
return flinkClusterJobPhaseInfo(ctx, jobStatus, occurredAt, info), nil
}
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
}
func (flinkResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
app := resource.(*flinkOp.FlinkCluster)
return flinkClusterPhaseInfo(ctx, app, time.Now())
}