func()

in controllers/flinkcluster/flinkcluster_observer.go [508:538]


func (observer *ClusterStateObserver) observeTaskManager(
	ctx context.Context,
	observed *ObservedClusterState) error {
	var clusterName = observer.request.Name
	// TaskManager StatefulSet
	tmDeploymentType := observed.cluster.Spec.TaskManager.DeploymentType
	if tmDeploymentType == "" || tmDeploymentType == v1beta1.DeploymentTypeStatefulSet {
		observed.tmStatefulSet = new(appsv1.StatefulSet)
		tmName := getTaskManagerName(clusterName)
		if err := observer.observeObject(ctx, tmName, observed.tmStatefulSet); err != nil {
			if client.IgnoreNotFound(err) != nil {
				return err
			}
			observed.tmStatefulSet = nil
		}
	}

	// TaskManager Deployment
	if tmDeploymentType == v1beta1.DeploymentTypeDeployment {
		observed.tmDeployment = new(appsv1.Deployment)
		tmName := getTaskManagerName(clusterName)
		if err := observer.observeObject(ctx, tmName, observed.tmDeployment); err != nil {
			if client.IgnoreNotFound(err) != nil {
				return err
			}
			observed.tmDeployment = nil
		}
	}

	return nil
}