in controllers/flinkcluster/flinkcluster_observer.go [131:231]
func (observer *ClusterStateObserver) observe(ctx context.Context, observed *ObservedClusterState) error {
var log = logr.FromContextOrDiscard(ctx)
// Cluster state.
observed.cluster = new(v1beta1.FlinkCluster)
if err := observer.observeCluster(ctx, observed.cluster); err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get the cluster resource")
return err
}
observer.sendDeletedEvent()
observed.cluster = nil
}
if observed.cluster != nil {
// Revisions.
if err := observer.observeRevisions(observed); err != nil {
log.Error(err, "Failed to get the controllerRevision resource list")
return err
}
// ConfigMap.
if err := observer.observeConfigMap(ctx, observed); err != nil {
log.Error(err, "Failed to get configMap")
return err
}
// HA ConfigMap.
if err := observer.observeHAConfigMap(ctx, observed); err != nil {
log.Error(err, "Failed to get HA configMap")
return err
}
// PodDisruptionBudget.
if err := observer.observePodDisruptionBudget(ctx, observed); err != nil {
log.Error(err, "Failed to get PodDisruptionBudget")
return err
}
// JobManager StatefulSet.
if !IsApplicationModeCluster(observed.cluster) {
if err := observer.observeJobManager(ctx, observed); err != nil {
log.Error(err, "Failed to get JobManager StatefulSet")
return err
}
}
// JobManager service.
if err := observer.observeJobManagerService(ctx, observed); err != nil {
log.Error(err, "Failed to get JobManager service")
return err
}
// (Optional) JobManager ingress.
if err := observer.observeJobManagerIngress(ctx, observed); err != nil {
log.Error(err, "Failed to get JobManager ingress")
return err
}
// TaskManager
if err := observer.observeTaskManager(ctx, observed); err != nil {
log.Error(err, "Failed to get TaskManager")
return err
}
// HorizontalPodAutoscaler
if err := observer.observeHorizontalPodAutoscaler(ctx, observed); err != nil {
log.Error(err, "Failed to get HorizontalPodAutoscaler")
return err
}
// TaskManager Service.
if err := observer.observeTaskManagerService(ctx, observed); err != nil {
log.Error(err, "Failed to get TaskManager Service")
return err
}
// (Optional) Savepoint.
if err := observer.observeSavepoint(observed.cluster, &observed.savepoint); err != nil {
log.Error(err, "Failed to get Flink job savepoint status")
}
if err := observer.observePersistentVolumeClaims(ctx, observed); err != nil {
log.Error(err, "Failed to get persistent volume claim list")
return err
}
// (Optional) job.
if err := observer.observeJob(ctx, observed); err != nil {
log.Error(err, "Failed to get Flink job status")
return err
}
}
observed.observeTime = time.Now()
observed.updateState = getUpdateState(observed)
observer.logObservedState(ctx, observed)
return nil
}