in controllers/flinkcluster/flinkcluster_observer.go [508:538]
func (observer *ClusterStateObserver) observeTaskManager(
ctx context.Context,
observed *ObservedClusterState) error {
var clusterName = observer.request.Name
// TaskManager StatefulSet
tmDeploymentType := observed.cluster.Spec.TaskManager.DeploymentType
if tmDeploymentType == "" || tmDeploymentType == v1beta1.DeploymentTypeStatefulSet {
observed.tmStatefulSet = new(appsv1.StatefulSet)
tmName := getTaskManagerName(clusterName)
if err := observer.observeObject(ctx, tmName, observed.tmStatefulSet); err != nil {
if client.IgnoreNotFound(err) != nil {
return err
}
observed.tmStatefulSet = nil
}
}
// TaskManager Deployment
if tmDeploymentType == v1beta1.DeploymentTypeDeployment {
observed.tmDeployment = new(appsv1.Deployment)
tmName := getTaskManagerName(clusterName)
if err := observer.observeObject(ctx, tmName, observed.tmDeployment); err != nil {
if client.IgnoreNotFound(err) != nil {
return err
}
observed.tmDeployment = nil
}
}
return nil
}