in pkg/flink/resources.go [303:344]
func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCluster, error) {
cluster := FlinkCluster(*config.DefaultFlinkCluster.DeepCopy())
cluster.ObjectMeta = metav1.ObjectMeta{
Name: taskCtx.ClusterName.String(),
Namespace: taskCtx.Namespace,
Annotations: taskCtx.Annotations,
Labels: taskCtx.Labels,
}
cluster.TypeMeta = metav1.TypeMeta{
Kind: KindFlinkCluster,
APIVersion: flinkOp.GroupVersion.String(),
}
cluster.Spec.EnvVars = append(cluster.Spec.EnvVars, corev1.EnvVar{
Name: stagedJarsEnvVarName,
Value: strings.Join(GetJobArtifacts(&taskCtx.Job), " "),
})
cluster.updateFlinkProperties(config, taskCtx)
if version := taskCtx.Job.GetFlinkVersion(); len(version) != 0 {
cluster.Spec.FlinkVersion = version
}
if image := taskCtx.Job.GetImage(); len(image) != 0 {
cluster.Spec.Image.Name = image
}
if sa := taskCtx.Job.GetServiceAccount(); len(sa) != 0 {
cluster.Spec.ServiceAccountName = &sa
}
cluster.updateJobManagerSpec(taskCtx)
cluster.updateTaskManagerSpec(taskCtx)
if err := cluster.updateJobSpec(taskCtx); err != nil {
return nil, err
}
// fill in defaults
resource := flinkOp.FlinkCluster(cluster)
return &resource, nil
}