func NewFlinkCluster()

in pkg/flink/resources.go [303:344]


func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCluster, error) {
	cluster := FlinkCluster(*config.DefaultFlinkCluster.DeepCopy())
	cluster.ObjectMeta = metav1.ObjectMeta{
		Name:        taskCtx.ClusterName.String(),
		Namespace:   taskCtx.Namespace,
		Annotations: taskCtx.Annotations,
		Labels:      taskCtx.Labels,
	}
	cluster.TypeMeta = metav1.TypeMeta{
		Kind:       KindFlinkCluster,
		APIVersion: flinkOp.GroupVersion.String(),
	}
	cluster.Spec.EnvVars = append(cluster.Spec.EnvVars, corev1.EnvVar{
		Name:  stagedJarsEnvVarName,
		Value: strings.Join(GetJobArtifacts(&taskCtx.Job), " "),
	})

	cluster.updateFlinkProperties(config, taskCtx)

	if version := taskCtx.Job.GetFlinkVersion(); len(version) != 0 {
		cluster.Spec.FlinkVersion = version
	}

	if image := taskCtx.Job.GetImage(); len(image) != 0 {
		cluster.Spec.Image.Name = image
	}

	if sa := taskCtx.Job.GetServiceAccount(); len(sa) != 0 {
		cluster.Spec.ServiceAccountName = &sa
	}

	cluster.updateJobManagerSpec(taskCtx)
	cluster.updateTaskManagerSpec(taskCtx)
	if err := cluster.updateJobSpec(taskCtx); err != nil {
		return nil, err
	}

	// fill in defaults
	resource := flinkOp.FlinkCluster(cluster)

	return &resource, nil
}