func()

in controllers/flinkcluster/flinkcluster_observer.go [131:231]


func (observer *ClusterStateObserver) observe(ctx context.Context, observed *ObservedClusterState) error {
	var log = logr.FromContextOrDiscard(ctx)

	// Cluster state.
	observed.cluster = new(v1beta1.FlinkCluster)
	if err := observer.observeCluster(ctx, observed.cluster); err != nil {
		if client.IgnoreNotFound(err) != nil {
			log.Error(err, "Failed to get the cluster resource")
			return err
		}
		observer.sendDeletedEvent()
		observed.cluster = nil
	}

	if observed.cluster != nil {
		// Revisions.
		if err := observer.observeRevisions(observed); err != nil {
			log.Error(err, "Failed to get the controllerRevision resource list")
			return err
		}

		// ConfigMap.
		if err := observer.observeConfigMap(ctx, observed); err != nil {
			log.Error(err, "Failed to get configMap")
			return err
		}

		// HA ConfigMap.
		if err := observer.observeHAConfigMap(ctx, observed); err != nil {
			log.Error(err, "Failed to get HA configMap")
			return err
		}

		// PodDisruptionBudget.
		if err := observer.observePodDisruptionBudget(ctx, observed); err != nil {
			log.Error(err, "Failed to get PodDisruptionBudget")
			return err
		}

		// JobManager StatefulSet.
		if !IsApplicationModeCluster(observed.cluster) {
			if err := observer.observeJobManager(ctx, observed); err != nil {
				log.Error(err, "Failed to get JobManager StatefulSet")
				return err
			}
		}

		// JobManager service.
		if err := observer.observeJobManagerService(ctx, observed); err != nil {
			log.Error(err, "Failed to get JobManager service")
			return err
		}

		// (Optional) JobManager ingress.
		if err := observer.observeJobManagerIngress(ctx, observed); err != nil {
			log.Error(err, "Failed to get JobManager ingress")
			return err
		}

		// TaskManager
		if err := observer.observeTaskManager(ctx, observed); err != nil {
			log.Error(err, "Failed to get TaskManager")
			return err
		}

		// HorizontalPodAutoscaler
		if err := observer.observeHorizontalPodAutoscaler(ctx, observed); err != nil {
			log.Error(err, "Failed to get HorizontalPodAutoscaler")
			return err
		}

		// TaskManager Service.
		if err := observer.observeTaskManagerService(ctx, observed); err != nil {
			log.Error(err, "Failed to get TaskManager Service")
			return err
		}

		// (Optional) Savepoint.
		if err := observer.observeSavepoint(observed.cluster, &observed.savepoint); err != nil {
			log.Error(err, "Failed to get Flink job savepoint status")
		}

		if err := observer.observePersistentVolumeClaims(ctx, observed); err != nil {
			log.Error(err, "Failed to get persistent volume claim list")
			return err
		}

		// (Optional) job.
		if err := observer.observeJob(ctx, observed); err != nil {
			log.Error(err, "Failed to get Flink job status")
			return err
		}
	}

	observed.observeTime = time.Now()
	observed.updateState = getUpdateState(observed)

	observer.logObservedState(ctx, observed)

	return nil
}