in pkg/flink/handler.go [57:103]
func NewFlinkTaskContext(ctx context.Context, taskCtx FlinkTaskExecContext) (*FlinkTaskContext, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
} else if taskTemplate == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "nil task specification")
}
job := flinkIdl.FlinkJob{}
err = utils.UnmarshalStruct(taskTemplate.GetCustom(), &job)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid TaskSpecification [%v], failed to unmarshal", taskTemplate.GetCustom())
}
err = Validate(&job)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid FlinkJob [%v], failed to unmarshal", job)
}
taskInput, err := taskCtx.InputReader().Get(ctx)
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to fetch task inputs [%v]", err.Error())
}
// add task input literals to flink job args
inputs := taskInput.GetLiterals()
args, err := literalMapToFlinkJobArgs(inputs)
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "not support input arg type [%v]", err.Error())
}
job.Args = append(job.Args, args...)
taskMetadata := taskCtx.TaskExecutionMetadata()
cn, err := NewClusterName(taskMetadata.GetTaskExecutionID().GetGeneratedName())
if err != nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "invalid cluster name [%v]", err.Error())
}
return &FlinkTaskContext{
ClusterName: cn,
Namespace: taskMetadata.GetNamespace(),
Annotations: GetDefaultAnnotations(taskMetadata),
Labels: GetDefaultLabels(taskMetadata),
EnvironmentVariables: GetDefaultEnvironmentVariables(taskMetadata),
Job: job,
}, nil
}