in pkg/flink/handler.go [123:137]
func (flinkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
// Start with default config values.
config := GetFlinkConfig()
flinkTaskCtx, err := NewFlinkTaskContext(ctx, taskCtx)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid Flink task context")
}
cluster, err := NewFlinkCluster(config, *flinkTaskCtx)
if err != nil {
return nil, errors.Wrapf(errors.BadTaskSpecification, err, "invalid Flink cluster")
}
return cluster, nil
}