func NewFlinkTaskContext()

in pkg/flink/handler.go [57:103]


func NewFlinkTaskContext(ctx context.Context, taskCtx FlinkTaskExecContext) (*FlinkTaskContext, error) {
	taskTemplate, err := taskCtx.TaskReader().Read(ctx)
	if err != nil {
		return nil, errors.Errorf(errors.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
	} else if taskTemplate == nil {
		return nil, errors.Errorf(errors.BadTaskSpecification, "nil task specification")
	}

	job := flinkIdl.FlinkJob{}
	err = utils.UnmarshalStruct(taskTemplate.GetCustom(), &job)
	if err != nil {
		return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid TaskSpecification [%v], failed to unmarshal", taskTemplate.GetCustom())
	}

	err = Validate(&job)
	if err != nil {
		return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid FlinkJob [%v], failed to unmarshal", job)
	}

	taskInput, err := taskCtx.InputReader().Get(ctx)
	if err != nil {
		return nil, errors.Errorf(errors.BadTaskSpecification, "unable to fetch task inputs [%v]", err.Error())
	}

	// add task input literals to flink job args
	inputs := taskInput.GetLiterals()
	args, err := literalMapToFlinkJobArgs(inputs)
	if err != nil {
		return nil, errors.Errorf(errors.BadTaskSpecification, "not support input arg type [%v]", err.Error())
	}
	job.Args = append(job.Args, args...)

	taskMetadata := taskCtx.TaskExecutionMetadata()
	cn, err := NewClusterName(taskMetadata.GetTaskExecutionID().GetGeneratedName())
	if err != nil {
		return nil, errors.Errorf(errors.BadTaskSpecification, "invalid cluster name [%v]", err.Error())
	}

	return &FlinkTaskContext{
		ClusterName:          cn,
		Namespace:            taskMetadata.GetNamespace(),
		Annotations:          GetDefaultAnnotations(taskMetadata),
		Labels:               GetDefaultLabels(taskMetadata),
		EnvironmentVariables: GetDefaultEnvironmentVariables(taskMetadata),
		Job:                  job,
	}, nil
}