controllers/flinkcluster/flinkcluster_observer.go (646 lines of code) (raw):

/* Copyright 2019 Google LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package flinkcluster import ( "context" "fmt" "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" "github.com/go-logr/logr" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/controllers/history" flink "github.com/spotify/flink-on-k8s-operator/internal/flink" "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) // ClusterStateObserver gets the observed state of the cluster. type ClusterStateObserver struct { k8sClient client.Client k8sClientset *kubernetes.Clientset flinkClient *flink.Client request ctrl.Request history history.Interface recorder record.EventRecorder } // ObservedClusterState holds observed state of a cluster. type ObservedClusterState struct { cluster *v1beta1.FlinkCluster revisions []*appsv1.ControllerRevision configMap *corev1.ConfigMap haConfigMap *corev1.ConfigMap jmStatefulSet *appsv1.StatefulSet jmService *corev1.Service jmIngress *networkingv1.Ingress tmStatefulSet *appsv1.StatefulSet tmDeployment *appsv1.Deployment tmService *corev1.Service podDisruptionBudget *policyv1.PodDisruptionBudget horizontalPodAutoscaler *autoscalingv2.HorizontalPodAutoscaler persistentVolumeClaims *corev1.PersistentVolumeClaimList flinkJob FlinkJob flinkJobSubmitter FlinkJobSubmitter savepoint Savepoint revision Revision observeTime time.Time updateState UpdateState } type FlinkJob struct { status *flink.Job list *flink.JobsOverview exceptions *flink.JobExceptions unexpected []string } type FlinkJobSubmitter struct { job *batchv1.Job pod *corev1.Pod log *SubmitterLog } type SubmitterLog struct { jobID string message string } type Savepoint struct { status *flink.SavepointStatus error error } type Revision struct { currentRevision *appsv1.ControllerRevision nextRevision *appsv1.ControllerRevision collisionCount int32 } // Job submitter status. func (s *FlinkJobSubmitter) getState() JobSubmitState { switch { case s.log != nil && s.log.jobID != "": return JobDeployStateSucceeded // Job ID not found cases: // Failed and job ID not found. case s.job != nil && s.job.Status.Failed > 0: return JobDeployStateFailed // Ongoing job submission. case s.job != nil && s.job.Status.Succeeded == 0 && s.job.Status.Failed == 0: fallthrough // Finished, but failed to extract log. case s.log == nil: return JobDeployStateInProgress } // Abnormal case: successfully finished but job ID not found. return JobDeployStateUnknown } // Observes the state of the cluster and its components. // NOT_FOUND error is ignored because it is normal, other errors are returned. func (observer *ClusterStateObserver) observe(ctx context.Context, observed *ObservedClusterState) error { var log = logr.FromContextOrDiscard(ctx) // Cluster state. observed.cluster = new(v1beta1.FlinkCluster) if err := observer.observeCluster(ctx, observed.cluster); err != nil { if client.IgnoreNotFound(err) != nil { log.Error(err, "Failed to get the cluster resource") return err } observer.sendDeletedEvent() observed.cluster = nil } if observed.cluster != nil { // Revisions. if err := observer.observeRevisions(observed); err != nil { log.Error(err, "Failed to get the controllerRevision resource list") return err } // ConfigMap. if err := observer.observeConfigMap(ctx, observed); err != nil { log.Error(err, "Failed to get configMap") return err } // HA ConfigMap. if err := observer.observeHAConfigMap(ctx, observed); err != nil { log.Error(err, "Failed to get HA configMap") return err } // PodDisruptionBudget. if err := observer.observePodDisruptionBudget(ctx, observed); err != nil { log.Error(err, "Failed to get PodDisruptionBudget") return err } // JobManager StatefulSet. if !IsApplicationModeCluster(observed.cluster) { if err := observer.observeJobManager(ctx, observed); err != nil { log.Error(err, "Failed to get JobManager StatefulSet") return err } } // JobManager service. if err := observer.observeJobManagerService(ctx, observed); err != nil { log.Error(err, "Failed to get JobManager service") return err } // (Optional) JobManager ingress. if err := observer.observeJobManagerIngress(ctx, observed); err != nil { log.Error(err, "Failed to get JobManager ingress") return err } // TaskManager if err := observer.observeTaskManager(ctx, observed); err != nil { log.Error(err, "Failed to get TaskManager") return err } // HorizontalPodAutoscaler if err := observer.observeHorizontalPodAutoscaler(ctx, observed); err != nil { log.Error(err, "Failed to get HorizontalPodAutoscaler") return err } // TaskManager Service. if err := observer.observeTaskManagerService(ctx, observed); err != nil { log.Error(err, "Failed to get TaskManager Service") return err } // (Optional) Savepoint. if err := observer.observeSavepoint(observed.cluster, &observed.savepoint); err != nil { log.Error(err, "Failed to get Flink job savepoint status") } if err := observer.observePersistentVolumeClaims(ctx, observed); err != nil { log.Error(err, "Failed to get persistent volume claim list") return err } // (Optional) job. if err := observer.observeJob(ctx, observed); err != nil { log.Error(err, "Failed to get Flink job status") return err } } observed.observeTime = time.Now() observed.updateState = getUpdateState(observed) observer.logObservedState(ctx, observed) return nil } func (observer *ClusterStateObserver) sendDeletedEvent() { var eventCluster = &v1beta1.FlinkCluster{ TypeMeta: metav1.TypeMeta{ Kind: "FlinkCluster", APIVersion: "flinkoperator.k8s.io/v1beta1", }, ObjectMeta: metav1.ObjectMeta{ Name: observer.request.Name, Namespace: observer.request.Namespace, }, } observer.recorder.Event( eventCluster, "Normal", "StatusUpdate", "Cluster status: Deleted") } func (observer *ClusterStateObserver) observeJob( ctx context.Context, observed *ObservedClusterState) error { // Either the cluster has been deleted or it is a session cluster. if observed.cluster == nil || observed.cluster.Spec.Job == nil { return nil } var log = logr.FromContextOrDiscard(ctx) // Extract the log stream from pod only when the job state is Deploying. var recordedJob = observed.cluster.Status.Components.Job var jobName string var applicationMode = IsApplicationModeCluster(observed.cluster) if applicationMode { jobName = getJobManagerJobName(observed.cluster.Name) } else { jobName = getSubmitterJobName(observed.cluster.Name) } // Job resource. job := new(batchv1.Job) if err := observer.observeObject(ctx, jobName, job); err != nil { if client.IgnoreNotFound(err) != nil { log.Error(err, "job submitter batchv1.Job") } job = nil } // Get job submitter pod resource. jobPod := new(corev1.Pod) if err := observer.observeJobSubmitterPod(ctx, jobName, jobPod); err != nil { if client.IgnoreNotFound(err) != nil { log.Error(err, "job submitter corev1.Pod") } jobPod = nil } var submitterLog *SubmitterLog // Extract submission result only when it is in deployment progress or the submitter pod failed. // It is not necessary to get the log stream from the submitter pod always. var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying if jobPod != nil && (jobDeployInProgress || jobPod.Status.Phase == corev1.PodFailed) { var err error submitterLog, err = getFlinkJobSubmitLog(observer.k8sClientset, jobPod) if err != nil { // Error occurred while pulling log stream from the job submitter pod. // In this case the operator must return the error and retry in the next reconciliation iteration. log.Info("Failed to get log stream from the job submitter pod. Will try again in the next iteration.") submitterLog = nil } } observed.flinkJobSubmitter = FlinkJobSubmitter{ job: job, pod: jobPod, log: submitterLog, } // Wait until the job manager is ready. jmReady := applicationMode || (observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady) if jmReady { // Observe the Flink job status. var flinkJobID string if jobID, ok := jobPod.Labels[JobIdLabel]; ok { flinkJobID = jobID } else // Get the ID from the job submitter. if submitterLog != nil && submitterLog.jobID != "" { flinkJobID = submitterLog.jobID } else // Or get the job ID from the recorded job status which is written in previous iteration. if recordedJob != nil { flinkJobID = recordedJob.ID } observer.observeFlinkJobStatus(ctx, observed, flinkJobID, &observed.flinkJob) } return nil } // Observes Flink job status through Flink API (instead of Kubernetes jobs through // Kubernetes API). // // This needs to be done after the job manager is ready, because we use it to detect whether the Flink API server is up // and running. func (observer *ClusterStateObserver) observeFlinkJobStatus(ctx context.Context, observed *ObservedClusterState, flinkJobID string, flinkJob *FlinkJob) { var log = logr.FromContextOrDiscard(ctx) // Observe following var flinkJobStatus *flink.Job var flinkJobList *flink.JobsOverview var flinkJobsUnexpected []string // Get Flink job status list. flinkAPIBaseURL := getFlinkAPIBaseURL(observed.cluster) flinkJobList, err := observer.flinkClient.GetJobsOverview(flinkAPIBaseURL) if err != nil { // It is normal in many cases, not an error. log.Info("Failed to get Flink job status list.", "error", err) return } flinkJob.list = flinkJobList // Extract the current job status and unexpected jobs. for _, job := range flinkJobList.Jobs { if flinkJobID == job.Id { flinkJobStatus = new(flink.Job) *flinkJobStatus = job } else if getFlinkJobDeploymentState(job.State) == v1beta1.JobStateRunning { flinkJobsUnexpected = append(flinkJobsUnexpected, job.Id) } } flinkJob.status = flinkJobStatus flinkJob.unexpected = flinkJobsUnexpected log.Info("Observed Flink job", "submitted job status", flinkJob.status, "all job list", flinkJob.list, "unexpected job list", flinkJob.unexpected) if len(flinkJobsUnexpected) > 0 { log.Info("More than one unexpected Flink job were found!") } if flinkJobID == "" { log.Info("No flinkJobID given. Skipping get exceptions") } else { flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID) if err != nil { // It is normal in many cases, not an error. log.Info("Failed to get Flink job exceptions.", "error", err) } else { log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions) flinkJob.exceptions = flinkJobExceptions } } } func (observer *ClusterStateObserver) observeSavepoint(cluster *v1beta1.FlinkCluster, savepoint *Savepoint) error { if cluster == nil || cluster.Status.Savepoint == nil || cluster.Status.Savepoint.State != v1beta1.SavepointStateInProgress { return nil } // Get savepoint status in progress. var flinkAPIBaseURL = getFlinkAPIBaseURL(cluster) var recordedSavepoint = cluster.Status.Savepoint var jobID = recordedSavepoint.JobID var triggerID = recordedSavepoint.TriggerID savepointStatus, err := observer.flinkClient.GetSavepointStatus(flinkAPIBaseURL, jobID, triggerID) savepoint.status = savepointStatus savepoint.error = err return err } func (observer *ClusterStateObserver) observeCluster(ctx context.Context, cluster *v1beta1.FlinkCluster) error { return observer.k8sClient.Get(ctx, observer.request.NamespacedName, cluster) } func (observer *ClusterStateObserver) observeRevisions( observed *ObservedClusterState) error { observed.revisions = []*appsv1.ControllerRevision{} selector := labels.SelectorFromSet(labels.Set(map[string]string{history.ControllerRevisionManagedByLabel: observed.cluster.GetName()})) controllerRevisions, err := observer.history.ListControllerRevisions(observed.cluster, selector) observed.revisions = append(observed.revisions, controllerRevisions...) if client.IgnoreNotFound(err) != nil { return err } return nil } func (observer *ClusterStateObserver) observePodDisruptionBudget( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name observed.podDisruptionBudget = new(policyv1.PodDisruptionBudget) pdbName := getPodDisruptionBudgetName(clusterName) if err := observer.observeObject(ctx, pdbName, observed.podDisruptionBudget); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.podDisruptionBudget = nil } return nil } func (observer *ClusterStateObserver) observeHorizontalPodAutoscaler( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name observed.horizontalPodAutoscaler = new(autoscalingv2.HorizontalPodAutoscaler) hpaName := getHorizontalPodAutoscalerName(clusterName) if err := observer.observeObject(ctx, hpaName, observed.horizontalPodAutoscaler); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.horizontalPodAutoscaler = nil } return nil } func (observer *ClusterStateObserver) observeConfigMap( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name observed.configMap = new(corev1.ConfigMap) configMapName := getConfigMapName(clusterName) if err := observer.observeObject(ctx, configMapName, observed.configMap); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.configMap = nil } return nil } func (observer *ClusterStateObserver) observeHAConfigMap( ctx context.Context, observed *ObservedClusterState) error { var fc = observed.cluster observed.haConfigMap = nil haConfigMapName := fc.GetHAConfigMapName() if haConfigMapName == "" { return nil } observed.haConfigMap = new(corev1.ConfigMap) if err := observer.observeObject(ctx, haConfigMapName, observed.haConfigMap); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.haConfigMap = nil } return nil } func (observer *ClusterStateObserver) observeJobManager( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name var jmStatefulSetName = getJobManagerName(clusterName) observed.jmStatefulSet = new(appsv1.StatefulSet) if err := observer.observeObject(ctx, jmStatefulSetName, observed.jmStatefulSet); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.jmStatefulSet = nil } return nil } func (observer *ClusterStateObserver) observeTaskManager( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name // TaskManager StatefulSet tmDeploymentType := observed.cluster.Spec.TaskManager.DeploymentType if tmDeploymentType == "" || tmDeploymentType == v1beta1.DeploymentTypeStatefulSet { observed.tmStatefulSet = new(appsv1.StatefulSet) tmName := getTaskManagerName(clusterName) if err := observer.observeObject(ctx, tmName, observed.tmStatefulSet); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.tmStatefulSet = nil } } // TaskManager Deployment if tmDeploymentType == v1beta1.DeploymentTypeDeployment { observed.tmDeployment = new(appsv1.Deployment) tmName := getTaskManagerName(clusterName) if err := observer.observeObject(ctx, tmName, observed.tmDeployment); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.tmDeployment = nil } } return nil } func (observer *ClusterStateObserver) observeTaskManagerService( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name observed.tmService = new(corev1.Service) name := getTaskManagerName(clusterName) if err := observer.observeObject(ctx, name, observed.tmService); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.tmService = nil } return nil } func (observer *ClusterStateObserver) observeJobManagerService( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name observed.jmService = new(corev1.Service) jmSvcName := getJobManagerServiceName(clusterName) if err := observer.observeObject(ctx, jmSvcName, observed.jmService); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.jmService = nil } return nil } func (observer *ClusterStateObserver) observeJobManagerIngress( ctx context.Context, observed *ObservedClusterState) error { var clusterName = observer.request.Name observed.jmIngress = new(networkingv1.Ingress) jmIngressName := getJobManagerIngressName(clusterName) if err := observer.observeObject(ctx, jmIngressName, observed.jmIngress); err != nil { if client.IgnoreNotFound(err) != nil { return err } observed.jmIngress = nil } return nil } // observeJobSubmitterPod observes job submitter pod. func (observer *ClusterStateObserver) observeJobSubmitterPod( ctx context.Context, jobName string, observedPod *corev1.Pod) error { var clusterNamespace = observer.request.Namespace var podSelector = labels.SelectorFromSet(map[string]string{"job-name": jobName}) var podList = new(corev1.PodList) var err = observer.k8sClient.List( ctx, podList, client.InNamespace(clusterNamespace), client.MatchingLabelsSelector{Selector: podSelector}) if err != nil { return err } if len(podList.Items) == 0 { observedPod = nil } else { podList.Items[0].DeepCopyInto(observedPod) } return nil } func (observer *ClusterStateObserver) observePersistentVolumeClaims( ctx context.Context, observed *ObservedClusterState) error { var clusterNamespace = observer.request.Namespace var clusterName = observer.request.Name var selector = labels.SelectorFromSet(map[string]string{"cluster": clusterName}) observed.persistentVolumeClaims = new(corev1.PersistentVolumeClaimList) err := observer.k8sClient.List( ctx, observed.persistentVolumeClaims, client.InNamespace(clusterNamespace), client.MatchingLabelsSelector{Selector: selector}) if client.IgnoreNotFound(err) != nil { return err } return nil } // syncRevisionStatus synchronizes current FlinkCluster resource and its child ControllerRevision resources. // When FlinkCluster resource is edited, the operator creates new child ControllerRevision for it // and updates nextRevision in FlinkClusterStatus to the name of the new ControllerRevision. // At that time, the name of the ControllerRevision is composed with the hash string generated // from the FlinkClusterSpec which is to be stored in it. // Therefore the contents of the ControllerRevision resources are maintained not duplicate. // If edited FlinkClusterSpec is the same with the content of any existing ControllerRevision resources, // the operator will only update nextRevision of the FlinkClusterStatus to the name of the ControllerRevision // that has the same content, instead of creating new ControllerRevision. // Finally, it maintains the number of child ControllerRevision resources according to RevisionHistoryLimit. func (observer *ClusterStateObserver) syncRevisionStatus(observed *ObservedClusterState) error { if observed.cluster == nil { return nil } var cluster = observed.cluster var revisions = observed.revisions var recorded = cluster.Status var currentRevision, nextRevision *appsv1.ControllerRevision var controllerHistory = observer.history revisionCount := len(revisions) history.SortControllerRevisions(revisions) // Use a local copy of cluster.Status.CollisionCount to avoid modifying cluster.Status directly. var collisionCount int32 if recorded.Revision.CollisionCount != nil { collisionCount = *recorded.Revision.CollisionCount } // create a new revision from the current cluster nextRevision, err := newRevision(cluster, util.GetNextRevisionNumber(revisions), &collisionCount) if err != nil { return err } // find any equivalent revisions equalRevisions := history.FindEqualRevisions(revisions, nextRevision) equalCount := len(equalRevisions) if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) { // if the equivalent revision is immediately prior the next revision has not changed nextRevision = revisions[revisionCount-1] } else if equalCount > 0 { // if the equivalent revision is not immediately prior we will roll back by incrementing the // Revision of the equivalent revision nextRevision, err = controllerHistory.UpdateControllerRevision( equalRevisions[equalCount-1], nextRevision.Revision) if err != nil { return err } } else { //if there is no equivalent revision we create a new one nextRevision, err = controllerHistory.CreateControllerRevision(cluster, nextRevision, &collisionCount) if err != nil { return err } } // if the current revision is nil we initialize the history by setting it to the next revision if recorded.Revision.CurrentRevision == "" { currentRevision = nextRevision // attempt to find the revision that corresponds to the current revision } else { for i := range revisions { if revisions[i].Name == getCurrentRevisionName(&recorded.Revision) { currentRevision = revisions[i] break } } } if currentRevision == nil { return fmt.Errorf("current ControlRevision resoucre not found") } // Update revision status. observed.revision = Revision{ currentRevision: currentRevision.DeepCopy(), nextRevision: nextRevision.DeepCopy(), collisionCount: collisionCount, } // maintain the revision history limit err = observer.truncateHistory(observed) if err != nil { return err } return nil } func (observer *ClusterStateObserver) truncateHistory(observed *ObservedClusterState) error { var cluster = observed.cluster var revisions = observed.revisions // TODO: default limit var historyLimit int if cluster.Spec.RevisionHistoryLimit != nil { historyLimit = int(*cluster.Spec.RevisionHistoryLimit) } else { historyLimit = 10 } nonLiveHistory := util.GetNonLiveHistory(revisions, historyLimit) // delete any non-live history to maintain the revision limit. for i := 0; i < len(nonLiveHistory); i++ { if err := observer.history.DeleteControllerRevision(nonLiveHistory[i]); err != nil { return err } } return nil } func (observer *ClusterStateObserver) observeObject(ctx context.Context, name string, obj client.Object) error { var namespace = observer.request.Namespace return observer.k8sClient.Get( ctx, types.NamespacedName{Namespace: namespace, Name: name}, obj) } func (observer *ClusterStateObserver) logObservedState(ctx context.Context, observed *ObservedClusterState) error { log := logr.FromContextOrDiscard(ctx) if observed.cluster == nil { log = log.WithValues("cluster", "nil") } else { log = log.WithValues("cluster", *observed.cluster) var b strings.Builder for _, cr := range observed.revisions { fmt.Fprintf(&b, "{name: %v, revision: %v},", cr.Name, cr.Revision) } log = log.WithValues("controllerRevisions", fmt.Sprintf("[%v]", b.String())) if observed.configMap != nil { log = log.WithValues("configMap", *observed.configMap) } else { log = log.WithValues("configMap", "nil") } if observed.podDisruptionBudget != nil { log = log.WithValues("podDisruptionBudget", *observed.podDisruptionBudget) } else { log = log.WithValues("podDisruptionBudget", "nil") } if observed.jmStatefulSet != nil { log = log.WithValues("jmStatefulSet", *observed.jmStatefulSet) } else { log = log.WithValues("jmStatefulSet", "nil") } if observed.jmService != nil { log = log.WithValues("jmService", *observed.jmService) } else { log = log.WithValues("jmService", "nil") } if observed.jmIngress != nil { log = log.WithValues("jmIngress", *observed.jmIngress) } else { log = log.WithValues("jmIngress", "nil") } if observed.tmStatefulSet != nil { log = log.WithValues("tmStatefulSet", *observed.tmStatefulSet) } else { log = log.WithValues("tmStatefulSet", "nil") } if observed.tmDeployment != nil { log = log.WithValues("tmDeployment", *observed.tmDeployment) } else { log = log.WithValues("tmDeployment", "nil") } if observed.tmService != nil { log = log.WithValues("tmService", *observed.tmService) } else { log = log.WithValues("tmService", "nil") } if observed.horizontalPodAutoscaler != nil { log = log.WithValues("horizontalPodAutoscaler", *observed.horizontalPodAutoscaler) } else { log = log.WithValues("horizontalPodAutoscaler", "nil") } if observed.savepoint.status != nil { log = log.WithValues("savepoint", *observed.savepoint.status) } else { log = log.WithValues("savepoint", "nil") } if observed.persistentVolumeClaims != nil { log = log.WithValues("persistentVolumeClaims", len(observed.persistentVolumeClaims.Items)) } else { log = log.WithValues("persistentVolumeClaims", "nil") } } log.Info("Observed state") return nil }