controllers/flinkcluster/flinkcluster_updater.go (959 lines of code) (raw):

/* Copyright 2019 Google LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package flinkcluster // Updater which updates the status of a cluster based on the status of its // components. import ( "encoding/json" "fmt" "reflect" "time" batchv1 "k8s.io/api/batch/v1" "golang.org/x/net/context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "github.com/go-logr/logr" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( jobSubmitterPodMainContainerName = "main" ) // ClusterStatusUpdater updates the status of the FlinkCluster CR. type ClusterStatusUpdater struct { k8sClient client.Client recorder record.EventRecorder observed ObservedClusterState } type Status interface { String() string } // Compares the current status recorded in the cluster's status field and the // new status derived from the status of the components, updates the cluster // status if it is changed, returns the new status. func (updater *ClusterStatusUpdater) updateStatusIfChanged(ctx context.Context) (bool, error) { log := logr.FromContextOrDiscard(ctx) if updater.observed.cluster == nil { log.Info("The cluster has been deleted, no status to update") return false, nil } // Current status recorded in the cluster's status field. var oldStatus = v1beta1.FlinkClusterStatus{} updater.observed.cluster.Status.DeepCopyInto(&oldStatus) oldStatus.LastUpdateTime = "" // New status derived from the cluster's components. var newStatus = updater.deriveClusterStatus( ctx, updater.observed.cluster, &updater.observed) // Compare var changed = updater.isStatusChanged(ctx, oldStatus, newStatus) // Update if changed { log.Info( "Status changed", "old", updater.observed.cluster.Status, "new", newStatus) updater.createStatusChangeEvents(oldStatus, newStatus) var tc = &util.TimeConverter{} newStatus.LastUpdateTime = tc.ToString(time.Now()) return true, updater.updateClusterStatus(ctx, newStatus) } log.Info("No status change", "state", oldStatus.State) return false, nil } func (updater *ClusterStatusUpdater) createStatusChangeEvents( oldStatus v1beta1.FlinkClusterStatus, newStatus v1beta1.FlinkClusterStatus) { if oldStatus.Components.JobManager != nil && newStatus.Components.JobManager != nil && oldStatus.Components.JobManager.State != newStatus.Components.JobManager.State { updater.createStatusChangeEvent( "JobManager StatefulSet", oldStatus.Components.JobManager.State, newStatus.Components.JobManager.State) } // ConfigMap. if oldStatus.Components.ConfigMap != nil && newStatus.Components.ConfigMap != nil && oldStatus.Components.ConfigMap.State != newStatus.Components.ConfigMap.State { updater.createStatusChangeEvent( "ConfigMap", oldStatus.Components.ConfigMap.State, newStatus.Components.ConfigMap.State) } // JobManager service. if oldStatus.Components.JobManagerService.State != newStatus.Components.JobManagerService.State { updater.createStatusChangeEvent( "JobManager service", oldStatus.Components.JobManagerService.State, newStatus.Components.JobManagerService.State) } // JobManager ingress. if oldStatus.Components.JobManagerIngress == nil && newStatus.Components.JobManagerIngress != nil { updater.createStatusEvent( "JobManager ingress", newStatus.Components.JobManagerIngress.State) } if oldStatus.Components.JobManagerIngress != nil && newStatus.Components.JobManagerIngress != nil && oldStatus.Components.JobManagerIngress.State != newStatus.Components.JobManagerIngress.State { updater.createStatusChangeEvent( "JobManager ingress", oldStatus.Components.JobManagerIngress.State, newStatus.Components.JobManagerIngress.State) } // TaskManager Statefulset/Deployment. if oldStatus.Components.TaskManager != nil && newStatus.Components.TaskManager != nil && oldStatus.Components.TaskManager.State != newStatus.Components.TaskManager.State { updater.createStatusChangeEvent( "TaskManager", oldStatus.Components.TaskManager.State, newStatus.Components.TaskManager.State) } // Job. if oldStatus.Components.Job == nil && newStatus.Components.Job != nil { updater.createStatusEvent("Job", newStatus.Components.Job.State) } if oldStatus.Components.Job != nil && newStatus.Components.Job != nil && oldStatus.Components.Job.State != newStatus.Components.Job.State { updater.createStatusChangeEvent( "Job", oldStatus.Components.Job.State, newStatus.Components.Job.State) } // Cluster. if oldStatus.State != newStatus.State { updater.createStatusChangeEvent("Cluster", oldStatus.State, newStatus.State) } // Savepoint. if newStatus.Savepoint != nil && !reflect.DeepEqual(oldStatus.Savepoint, newStatus.Savepoint) { eventType, eventReason, eventMessage := getSavepointEvent(*newStatus.Savepoint) updater.recorder.Event(updater.observed.cluster, eventType, eventReason, eventMessage) } // Control. if newStatus.Control != nil && !reflect.DeepEqual(oldStatus.Control, newStatus.Control) { eventType, eventReason, eventMessage := getControlEvent(*newStatus.Control) updater.recorder.Event(updater.observed.cluster, eventType, eventReason, eventMessage) } } func (updater *ClusterStatusUpdater) createStatusEvent(name string, status Status) { updater.recorder.Event( updater.observed.cluster, "Normal", "StatusUpdate", fmt.Sprintf("%v status: %v", name, status)) } func (updater *ClusterStatusUpdater) createStatusChangeEvent( name string, oldStatus Status, newStatus Status) { updater.recorder.Event( updater.observed.cluster, "Normal", "StatusUpdate", fmt.Sprintf("%v status changed: %v -> %v", name, oldStatus, newStatus)) } func (updater *ClusterStatusUpdater) deriveClusterStatus( ctx context.Context, cluster *v1beta1.FlinkCluster, observed *ObservedClusterState) v1beta1.FlinkClusterStatus { var totalComponents int if IsApplicationModeCluster(cluster) { // jmService, tmStatefulSet. totalComponents = 2 } else { // jmStatefulSet, jmService, tmStatefulSet. totalComponents = 3 } var recorded = cluster.Status var status = v1beta1.FlinkClusterStatus{} var runningComponents = 0 // ConfigMap. var observedConfigMap = observed.configMap cmStatus := &status.Components.ConfigMap if !isComponentUpdated(observedConfigMap, observed.cluster) && shouldUpdateCluster(observed) { *cmStatus = new(v1beta1.ConfigMapStatus) recorded.Components.ConfigMap.DeepCopyInto(*cmStatus) (*cmStatus).State = v1beta1.ComponentStateUpdating } else if observedConfigMap != nil { *cmStatus = &v1beta1.ConfigMapStatus{ Name: observedConfigMap.Name, State: v1beta1.ComponentStateReady, } } else if recorded.Components.ConfigMap != nil { *cmStatus = &v1beta1.ConfigMapStatus{ Name: recorded.Components.ConfigMap.Name, State: v1beta1.ComponentStateDeleted, } } // JobManager StatefulSet. var observedJmStatefulSet = observed.jmStatefulSet jmStatus := &status.Components.JobManager if !IsApplicationModeCluster(cluster) { if !isComponentUpdated(observedJmStatefulSet, observed.cluster) && shouldUpdateCluster(observed) { *jmStatus = new(v1beta1.JobManagerStatus) recorded.Components.JobManager.DeepCopyInto(*jmStatus) (*jmStatus).State = v1beta1.ComponentStateUpdating } else if observedJmStatefulSet != nil { *jmStatus = &v1beta1.JobManagerStatus{ Name: observedJmStatefulSet.Name, State: getStatefulSetState(observedJmStatefulSet), Replicas: observedJmStatefulSet.Status.Replicas, ReadyReplicas: observedJmStatefulSet.Status.ReadyReplicas, Ready: fmt.Sprintf("%d/%d", observedJmStatefulSet.Status.ReadyReplicas, observedJmStatefulSet.Status.Replicas), } if (*jmStatus).State == v1beta1.ComponentStateReady { runningComponents++ } } else if recorded.Components.JobManager != nil { *jmStatus = &v1beta1.JobManagerStatus{ Name: recorded.Components.JobManager.Name, State: v1beta1.ComponentStateDeleted, } } } // JobManager service. var observedJmService = observed.jmService if !isComponentUpdated(observedJmService, observed.cluster) && shouldUpdateCluster(observed) { recorded.Components.JobManagerService.DeepCopyInto(&status.Components.JobManagerService) status.Components.JobManagerService.State = v1beta1.ComponentStateUpdating } else if observedJmService != nil { var nodePort int32 var loadBalancerIngress []corev1.LoadBalancerIngress state := v1beta1.ComponentStateNotReady switch observedJmService.Spec.Type { case corev1.ServiceTypeClusterIP: if observedJmService.Spec.ClusterIP != "" { state = v1beta1.ComponentStateReady runningComponents++ } case corev1.ServiceTypeLoadBalancer: if len(observedJmService.Status.LoadBalancer.Ingress) > 0 { state = v1beta1.ComponentStateReady runningComponents++ loadBalancerIngress = observedJmService.Status.LoadBalancer.Ingress } case corev1.ServiceTypeNodePort: if len(observedJmService.Spec.Ports) > 0 { state = v1beta1.ComponentStateReady runningComponents++ for _, port := range observedJmService.Spec.Ports { if port.Name == "ui" { nodePort = port.NodePort } } } } status.Components.JobManagerService = v1beta1.JobManagerServiceStatus{ Name: observedJmService.Name, State: state, NodePort: nodePort, LoadBalancerIngress: loadBalancerIngress, } } else if recorded.Components.JobManagerService.Name != "" { status.Components.JobManagerService = v1beta1.JobManagerServiceStatus{ Name: recorded.Components.JobManagerService.Name, State: v1beta1.ComponentStateDeleted, } } // (Optional) JobManager ingress. var observedJmIngress = observed.jmIngress if !isComponentUpdated(observedJmIngress, observed.cluster) && shouldUpdateCluster(observed) { status.Components.JobManagerIngress = &v1beta1.JobManagerIngressStatus{} recorded.Components.JobManagerIngress.DeepCopyInto(status.Components.JobManagerIngress) status.Components.JobManagerIngress.State = v1beta1.ComponentStateUpdating } else if observedJmIngress != nil { var state v1beta1.ComponentState var urls []string var useTLS bool var useHost bool var loadbalancerReady bool if len(observedJmIngress.Spec.TLS) > 0 { useTLS = true } if useTLS { for _, tls := range observedJmIngress.Spec.TLS { for _, host := range tls.Hosts { if host != "" { urls = append(urls, "https://"+host) } } } } else { for _, rule := range observedJmIngress.Spec.Rules { if rule.Host != "" { urls = append(urls, "http://"+rule.Host) } } } if len(urls) > 0 { useHost = true } // Check loadbalancer is ready. if len(observedJmIngress.Status.LoadBalancer.Ingress) > 0 { var addr string for _, ingress := range observedJmIngress.Status.LoadBalancer.Ingress { // Get loadbalancer address. if ingress.Hostname != "" { addr = ingress.Hostname } else if ingress.IP != "" { addr = ingress.IP } // If ingress spec does not have host, get ip or hostname of loadbalancer. if !useHost && addr != "" { if useTLS { urls = append(urls, "https://"+addr) } else { urls = append(urls, "http://"+addr) } } } // If any ready LB found, state is ready. if addr != "" { loadbalancerReady = true } } // Jobmanager ingress state become ready when LB for ingress is specified. if loadbalancerReady { state = v1beta1.ComponentStateReady } else { state = v1beta1.ComponentStateNotReady } status.Components.JobManagerIngress = &v1beta1.JobManagerIngressStatus{ Name: observedJmIngress.Name, State: state, URLs: urls, } } else if recorded.Components.JobManagerIngress != nil && recorded.Components.JobManagerIngress.Name != "" { status.Components.JobManagerIngress = &v1beta1.JobManagerIngressStatus{ Name: recorded.Components.JobManagerIngress.Name, State: v1beta1.ComponentStateDeleted, } } labelSelector := labels.SelectorFromSet(getComponentLabels(cluster, "taskmanager")) var clusterTmDeploymentType = cluster.Spec.TaskManager.DeploymentType if clusterTmDeploymentType == "" || clusterTmDeploymentType == v1beta1.DeploymentTypeStatefulSet { // TaskManager StatefulSet. var observedTmStatefulSet = observed.tmStatefulSet tmStatus := &status.Components.TaskManager if !isComponentUpdated(observedTmStatefulSet, observed.cluster) && shouldUpdateCluster(observed) { *tmStatus = new(v1beta1.TaskManagerStatus) recorded.Components.TaskManager.DeepCopyInto(*tmStatus) (*tmStatus).State = v1beta1.ComponentStateUpdating } else if observedTmStatefulSet != nil { *tmStatus = &v1beta1.TaskManagerStatus{ Name: observedTmStatefulSet.Name, State: getStatefulSetState(observedTmStatefulSet), Replicas: observedTmStatefulSet.Status.Replicas, ReadyReplicas: observedTmStatefulSet.Status.ReadyReplicas, Ready: fmt.Sprintf("%d/%d", observedTmStatefulSet.Status.ReadyReplicas, observedTmStatefulSet.Status.Replicas), Selector: labelSelector.String(), } if (*tmStatus).State == v1beta1.ComponentStateReady { runningComponents++ } } else if recorded.Components.TaskManager != nil { *tmStatus = &v1beta1.TaskManagerStatus{ Name: recorded.Components.TaskManager.Name, State: v1beta1.ComponentStateDeleted, } } } else { // TaskManager Deployment. var observedTmDeployment = observed.tmDeployment tmStatus := &status.Components.TaskManager if !isComponentUpdated(observedTmDeployment, observed.cluster) && shouldUpdateCluster(observed) { *tmStatus = new(v1beta1.TaskManagerStatus) recorded.Components.TaskManager.DeepCopyInto(*tmStatus) (*tmStatus).State = v1beta1.ComponentStateUpdating } else if observedTmDeployment != nil { *tmStatus = &v1beta1.TaskManagerStatus{ Name: observedTmDeployment.Name, State: getDeploymentState(observedTmDeployment), Replicas: observedTmDeployment.Status.Replicas, ReadyReplicas: observedTmDeployment.Status.ReadyReplicas, Ready: fmt.Sprintf("%d/%d", observedTmDeployment.Status.ReadyReplicas, observedTmDeployment.Status.Replicas), Selector: labelSelector.String(), } if (*tmStatus).State == v1beta1.ComponentStateReady { runningComponents++ } } else if recorded.Components.TaskManager != nil { *tmStatus = &v1beta1.TaskManagerStatus{ Name: recorded.Components.TaskManager.Name, State: v1beta1.ComponentStateDeleted, } } } // Derive the new cluster state. var jobStatus = recorded.Components.Job switch recorded.State { case "", v1beta1.ClusterStateCreating: if runningComponents < totalComponents { status.State = v1beta1.ClusterStateCreating if jobStatus.IsStopped() { var policy = observed.cluster.Spec.Job.CleanupPolicy if jobStatus.State == v1beta1.JobStateSucceeded && policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster { status.State = v1beta1.ClusterStateStopping } else if jobStatus.IsFailed() && policy.AfterJobFails != v1beta1.CleanupActionKeepCluster { status.State = v1beta1.ClusterStateStopping } else if jobStatus.State == v1beta1.JobStateCancelled && policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster { status.State = v1beta1.ClusterStateStopping } } } else { status.State = v1beta1.ClusterStateRunning } case v1beta1.ClusterStateUpdating: if shouldUpdateCluster(observed) { status.State = v1beta1.ClusterStateUpdating } else if runningComponents < totalComponents { if recorded.Revision.IsUpdateTriggered() { status.State = v1beta1.ClusterStateUpdating } else { status.State = v1beta1.ClusterStateReconciling } } else { status.State = v1beta1.ClusterStateRunning } case v1beta1.ClusterStateRunning, v1beta1.ClusterStateReconciling: if shouldUpdateCluster(observed) { status.State = v1beta1.ClusterStateUpdating } else if !recorded.Revision.IsUpdateTriggered() && jobStatus.IsStopped() { var policy = observed.cluster.Spec.Job.CleanupPolicy if jobStatus.State == v1beta1.JobStateSucceeded && policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster { status.State = v1beta1.ClusterStateStopping } else if jobStatus.IsFailed() && policy.AfterJobFails != v1beta1.CleanupActionKeepCluster { status.State = v1beta1.ClusterStateStopping } else if jobStatus.State == v1beta1.JobStateCancelled && policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster { status.State = v1beta1.ClusterStateStopping } else { status.State = v1beta1.ClusterStateRunning } } else if runningComponents < totalComponents { status.State = v1beta1.ClusterStateReconciling } else { status.State = v1beta1.ClusterStateRunning } case v1beta1.ClusterStateStopping, v1beta1.ClusterStatePartiallyStopped: if shouldUpdateCluster(observed) { status.State = v1beta1.ClusterStateUpdating } else if jobStatus.IsActive() { status.State = v1beta1.ClusterStateRunning } else if runningComponents == 0 { status.State = v1beta1.ClusterStateStopped } else if runningComponents < totalComponents { status.State = v1beta1.ClusterStatePartiallyStopped } else { status.State = v1beta1.ClusterStateStopping } case v1beta1.ClusterStateStopped: if recorded.Revision.IsUpdateTriggered() { status.State = v1beta1.ClusterStateUpdating } else { status.State = v1beta1.ClusterStateStopped } default: panic(fmt.Sprintf("Unknown cluster state: %v", recorded.State)) } // (Optional) Job. // Update job status. status.Components.Job = updater.deriveJobStatus(ctx) // (Optional) Savepoint. // Update savepoint status if it is in progress or requested. var newJobStatus = status.Components.Job status.Savepoint = updater.deriveSavepointStatus( &observed.savepoint, recorded.Savepoint, newJobStatus, updater.getFlinkJobID()) // (Optional) Control. // Update user requested control status. status.Control = deriveControlStatus( observed.cluster, status.Savepoint, status.Components.Job, recorded.Control) // Update revision status. // When update completed, finish the process by marking CurrentRevision to NextRevision. status.Revision = deriveRevisionStatus( observed.updateState, &observed.revision, &recorded.Revision) return status } // Gets Flink job ID based on the observed state and the recorded state. // // It is possible that the recorded is not nil, but the observed is, due // to transient error or being skiped as an optimization. // If this returned nil, it is the state that job is not submitted or not identified yet. func (updater *ClusterStatusUpdater) getFlinkJobID() *string { // Observed from active job manager. var observedFlinkJob = updater.observed.flinkJob.status if observedFlinkJob != nil && len(observedFlinkJob.Id) > 0 { return &observedFlinkJob.Id } var observedJobSubmitter = updater.observed.flinkJobSubmitter if observedJobSubmitter.pod != nil { if jobId, ok := observedJobSubmitter.pod.Labels[JobIdLabel]; ok { return &jobId } } // Observed from job submitter (when Flink API is not ready). if observedJobSubmitter.log != nil && observedJobSubmitter.log.jobID != "" { return &observedJobSubmitter.log.jobID } // Recorded. var recordedJobStatus = updater.observed.cluster.Status.Components.Job if recordedJobStatus != nil && len(recordedJobStatus.ID) > 0 { return &recordedJobStatus.ID } return nil } func (updater *ClusterStatusUpdater) deriveJobSubmitterExitCodeAndReason(pod *corev1.Pod, job *batchv1.Job) (int32, string) { for _, containerStatus := range pod.Status.ContainerStatuses { if containerStatus.Name == jobSubmitterPodMainContainerName && containerStatus.State.Terminated != nil { exitCode := containerStatus.State.Terminated.ExitCode reason := containerStatus.State.Terminated.Reason message := containerStatus.State.Terminated.Message return exitCode, fmt.Sprintf("[Exit code: %d] Reason: %s, Message: %s", exitCode, reason, message) } } // In some cases, the finished pod maybe collected by k8s garbage collector. In this case, we use job.Status to fill it if job.Status.Succeeded == 1 { return 0, "[Exit code: 0]" } if job.Status.Failed == 1 { return 1, fmt.Sprintf("[Exit code: %d] Reason: %s", 1, "Job submitter pod is lost. Returning exit code as 1.") } return -1, "" } func isNonZeroExitCode(exitCode int32) bool { return exitCode != 0 && exitCode != -1 } func (updater *ClusterStatusUpdater) deriveJobStatus(ctx context.Context) *v1beta1.JobStatus { log := logr.FromContextOrDiscard(ctx) var observed = updater.observed var observedCluster = observed.cluster var jobSpec = observedCluster.Spec.Job if jobSpec == nil { return nil } var observedSubmitter = observed.flinkJobSubmitter var observedFlinkJob = observed.flinkJob.status var observedSavepoint = observed.savepoint var recorded = observedCluster.Status var savepoint = recorded.Savepoint var oldJob = recorded.Components.Job var newJob *v1beta1.JobStatus // Derive new job state. if oldJob != nil { newJob = oldJob.DeepCopy() } else { newJob = new(v1beta1.JobStatus) } if observedSubmitter.job != nil { newJob.SubmitterName = observedSubmitter.job.Name exitCode, _ := updater.deriveJobSubmitterExitCodeAndReason(observed.flinkJobSubmitter.pod, observed.flinkJobSubmitter.job) newJob.SubmitterExitCode = exitCode } else if observedSubmitter.job == nil || observed.flinkJobSubmitter.pod == nil { // Submitter is nil, so the submitter exit code shouldn't be "running" if oldJob != nil && oldJob.SubmitterExitCode == -1 { newJob.SubmitterExitCode = 0 } } var newJobState v1beta1.JobState switch { case oldJob == nil: newJobState = v1beta1.JobStatePending case shouldUpdateJob(&observed): newJobState = v1beta1.JobStateUpdating case oldJob.IsStopped(): // When a new job is deploying, update the job state to deploying. if observedSubmitter.job != nil && (observedSubmitter.job.Status.Active == 1 || isJobInitialising(observedSubmitter.job.Status)) { newJobState = v1beta1.JobStateDeploying } else { newJobState = oldJob.State } case oldJob.IsPending() && oldJob.DeployTime != "": newJobState = v1beta1.JobStateDeploying // Derive the job state from the observed Flink job, if it exists. case observedFlinkJob != nil: if observedFlinkJob.Id != "" { newJob.ID = observedFlinkJob.Id } if observedFlinkJob.Name != "" { newJob.Name = observedFlinkJob.Name } tmpState := getFlinkJobDeploymentState(observedFlinkJob.State) if observedSubmitter.job == nil || (observedSubmitter.job.Status.Failed < 1 && tmpState != v1beta1.JobStateSucceeded) { newJobState = tmpState break } log.Info("The submitter maybe still running. Waiting for it") fallthrough case oldJob.IsActive() && observedSubmitter.job != nil && observedSubmitter.job.Status.Active == 0: if observedSubmitter.job.Status.Succeeded == 1 { newJobState = v1beta1.JobStateSucceeded if newJob.SubmitterExitCode == -1 { log.Info("Job succeeded but the exit code is -1. This is an edge case that may " + "happen if the controller is down or busy for a long time and the submitter pod is deleted externally " + "including by kube-system:pod-garbage-collector. Changing exit code to 0.") newJob.SubmitterExitCode = 0 } } else if observedSubmitter.job.Status.Failed == 1 { newJobState = v1beta1.JobStateFailed } else { newJobState = oldJob.State } case shouldStopJob(observedCluster): newJobState = v1beta1.JobStateCancelled // When Flink job not found in JobManager or JobManager is unavailable case isFlinkAPIReady(observed.flinkJob.list): if oldJob.State == v1beta1.JobStateRunning { newJobState = v1beta1.JobStateLost break } fallthrough default: // Maintain the job state as recorded if job is not being deployed. if oldJob.State != v1beta1.JobStateDeploying { newJobState = oldJob.State break } // Job must be in deployment but the submitter not found or tracking failed. var jobDeployState = observedSubmitter.getState() if observedSubmitter.job == nil || jobDeployState == JobDeployStateUnknown { newJobState = v1beta1.JobStateLost break } // Case in which the job submission clearly fails even if it is not confirmed by JobManager // Job submitter is deployed but failed. if jobDeployState == JobDeployStateFailed { newJobState = v1beta1.JobStateDeployFailed break } newJobState = oldJob.State } // Update State newJob.State = newJobState // Derived new job status if the state is changed. if oldJob == nil || oldJob.State != newJob.State { // TODO: It would be ideal to set the times with the timestamp retrieved from the Flink API like /jobs/{job-id}. switch { case newJob.IsPending(): newJob.DeployTime = "" switch newJob.State { case v1beta1.JobStateUpdating: newJob.RestartCount = 0 case v1beta1.JobStateRestarting: newJob.RestartCount++ } case newJob.State == v1beta1.JobStateRunning: util.SetTimestamp(&newJob.StartTime) newJob.CompletionTime = nil // When job started, the savepoint is not the final state of the job any more. if oldJob.FinalSavepoint { newJob.FinalSavepoint = false } case newJob.IsFailed(): if len(newJob.FailureReasons) == 0 { newJob.FailureReasons = []string{} exceptions := observed.flinkJob.exceptions if exceptions != nil && len(exceptions.Exceptions) > 0 { for _, e := range exceptions.Exceptions { newJob.FailureReasons = append(newJob.FailureReasons, e.Exception) } } else if observedSubmitter.log != nil { newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message) } } fallthrough case newJob.IsStopped(): if newJob.CompletionTime.IsZero() { now := metav1.Now() newJob.CompletionTime = &now } // When tracking failed, we cannot guarantee if the savepoint is the final job state. if newJob.State == v1beta1.JobStateLost && oldJob.FinalSavepoint { newJob.FinalSavepoint = false } // The job submitter may have failed even though the job execution was successful if len(newJob.FailureReasons) == 0 && oldJob.SubmitterExitCode != newJob.SubmitterExitCode && isNonZeroExitCode(newJob.SubmitterExitCode) && observedSubmitter.log != nil { newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message) } } } // Savepoint if observedSavepoint.status != nil && observedSavepoint.status.IsSuccessful() { newJob.SavepointGeneration++ newJob.SavepointLocation = observedSavepoint.status.Location if finalSavepointRequested(newJob.ID, savepoint) { newJob.FinalSavepoint = true } // TODO: SavepointTime should be set with the timestamp generated in job manager. // Currently savepoint complete timestamp is not included in savepoints API response. // Whereas checkpoint API returns the timestamp latest_ack_timestamp. // Note: https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-checkpoints-details-checkpointid util.SetTimestamp(&newJob.SavepointTime) } return newJob } func (updater *ClusterStatusUpdater) isStatusChanged( ctx context.Context, currentStatus v1beta1.FlinkClusterStatus, newStatus v1beta1.FlinkClusterStatus) bool { log := logr.FromContextOrDiscard(ctx) var changed = false if newStatus.State != currentStatus.State { changed = true log.Info( "Cluster state changed", "current", currentStatus.State, "new", newStatus.State) } if !reflect.DeepEqual(newStatus.Control, currentStatus.Control) { log.Info( "Control status changed", "current", currentStatus.Control, "new", newStatus.Control) changed = true } if !reflect.DeepEqual(newStatus.Components.ConfigMap, currentStatus.Components.ConfigMap) { log.Info( "ConfigMap status changed", "current", currentStatus.Components.ConfigMap, "new", newStatus.Components.ConfigMap) changed = true } if !reflect.DeepEqual(newStatus.Components.JobManager, currentStatus.Components.JobManager) { log.Info( "JobManager StatefulSet status changed", "current", currentStatus.Components.JobManager, "new", newStatus.Components.JobManager) changed = true } if !reflect.DeepEqual(newStatus.Components.JobManagerService, currentStatus.Components.JobManagerService) { log.Info( "JobManager service status changed", "current", currentStatus.Components.JobManagerService, "new", newStatus.Components.JobManagerService) changed = true } if currentStatus.Components.JobManagerIngress == nil { if newStatus.Components.JobManagerIngress != nil { log.Info( "JobManager ingress status changed", "current", "nil", "new", *newStatus.Components.JobManagerIngress) changed = true } } else { if newStatus.Components.JobManagerIngress.State != currentStatus.Components.JobManagerIngress.State { log.Info( "JobManager ingress status changed", "current", *currentStatus.Components.JobManagerIngress, "new", *newStatus.Components.JobManagerIngress) changed = true } } if !reflect.DeepEqual(newStatus.Components.TaskManager, currentStatus.Components.TaskManager) { log.Info( "TaskManager StatefulSet status changed", "current", currentStatus.Components.TaskManager, "new", newStatus.Components.TaskManager) changed = true } if currentStatus.Components.Job == nil { if newStatus.Components.Job != nil { log.Info( "Job status changed", "current", "nil", "new", *newStatus.Components.Job) changed = true } } else { if newStatus.Components.Job != nil { var isEqual = reflect.DeepEqual( newStatus.Components.Job, currentStatus.Components.Job) if !isEqual { log.Info( "Job status changed", "current", *currentStatus.Components.Job, "new", *newStatus.Components.Job) changed = true } } else { changed = true } } if !reflect.DeepEqual(newStatus.Savepoint, currentStatus.Savepoint) { log.Info( "Savepoint status changed", "current", currentStatus.Savepoint, "new", newStatus.Savepoint) changed = true } var nr = newStatus.Revision // New revision status var cr = currentStatus.Revision // Current revision status if nr.CurrentRevision != cr.CurrentRevision || nr.NextRevision != cr.NextRevision || (nr.CollisionCount != nil && cr.CollisionCount == nil) || (cr.CollisionCount != nil && *nr.CollisionCount != *cr.CollisionCount) { log.Info( "FlinkCluster revision status changed", "current", fmt.Sprintf("currentRevision: %v, nextRevision: %v, collisionCount: %v", cr.CurrentRevision, cr.NextRevision, cr.CollisionCount), "new", fmt.Sprintf("currentRevision: %v, nextRevision: %v, collisionCount: %v", nr.CurrentRevision, nr.NextRevision, nr.CollisionCount)) changed = true } return changed } func (updater *ClusterStatusUpdater) updateClusterStatus( ctx context.Context, status v1beta1.FlinkClusterStatus) error { return retry.RetryOnConflict(retry.DefaultBackoff, func() error { cluster := &v1beta1.FlinkCluster{} updater.observed.cluster.DeepCopyInto(cluster) lookupKey := types.NamespacedName{ Name: cluster.Name, Namespace: cluster.Namespace, } err := updater.k8sClient.Get(ctx, lookupKey, cluster) if err != nil { if client.IgnoreNotFound(err) != nil { return nil } return err } cluster.Status = status err = updater.k8sClient.Status().Update(ctx, cluster) // Clear control annotation after status update is complete. updater.clearControlAnnotation(ctx, status.Control) return err }) } // Clear finished or improper user control in annotations func (updater *ClusterStatusUpdater) clearControlAnnotation(ctx context.Context, newControlStatus *v1beta1.FlinkClusterControlStatus) error { var userControl = updater.observed.cluster.Annotations[v1beta1.ControlAnnotation] if userControl == "" { return nil } if newControlStatus == nil || userControl != newControlStatus.Name || // refused control in updater (userControl == newControlStatus.Name && isUserControlFinished(newControlStatus)) /* finished control */ { // make annotation patch cleared annotationPatch := objectForPatch{ Metadata: objectMetaForPatch{ Annotations: map[string]interface{}{ v1beta1.ControlAnnotation: nil, }, }, } patchBytes, err := json.Marshal(&annotationPatch) if err != nil { return err } rawPatch := client.RawPatch(types.MergePatchType, patchBytes) return updater.k8sClient.Patch(ctx, updater.observed.cluster, rawPatch) } return nil } func (updater *ClusterStatusUpdater) deriveSavepointStatus( observedSavepoint *Savepoint, recordedSavepointStatus *v1beta1.SavepointStatus, newJobStatus *v1beta1.JobStatus, flinkJobID *string) *v1beta1.SavepointStatus { if recordedSavepointStatus == nil { return nil } // Derived savepoint status to return var s = recordedSavepointStatus.DeepCopy() var errMsg string // Update the savepoint status when observed savepoint is found. if s.State == v1beta1.SavepointStateInProgress { // Derive the state from the observed savepoint in JobManager. status := observedSavepoint.status switch { case status != nil && status.IsSuccessful(): s.State = v1beta1.SavepointStateSucceeded case status != nil && status.IsFailed(): s.State = v1beta1.SavepointStateFailed errMsg = fmt.Sprintf("Savepoint error: %v", observedSavepoint.status.FailureCause.StackTrace) case observedSavepoint.error != nil: s.State = v1beta1.SavepointStateFailed errMsg = fmt.Sprintf("Failed to get savepoint status: %v", observedSavepoint.error) } // Derive the failure state from Flink job status. // Append additional error message if it already exists. if s.State == v1beta1.SavepointStateFailed { switch { case newJobStatus.IsStopped(): errMsg = "Flink job is stopped: " + errMsg s.State = v1beta1.SavepointStateFailed case flinkJobID == nil || *flinkJobID != recordedSavepointStatus.JobID: errMsg = "Savepoint triggered Flink job is lost: " + errMsg s.State = v1beta1.SavepointStateFailed } } } // TODO: Record event or introduce Condition in CRD status to notify update state pended. // https://github.com/kubernetes/apimachinery/blob/57f2a0733447cfd41294477d833cce6580faaca3/pkg/apis/meta/v1/types.go#L1376 // Make up message. if errMsg != "" { if s.TriggerReason == v1beta1.SavepointReasonUpdate { errMsg = "Failed to take savepoint for update. " + "The update process is being postponed until a savepoint is available. " + errMsg } if len(errMsg) > 1024 { errMsg = errMsg[:1024] } s.Message = errMsg } return s } func deriveControlStatus( cluster *v1beta1.FlinkCluster, newSavepoint *v1beta1.SavepointStatus, newJob *v1beta1.JobStatus, recordedControl *v1beta1.FlinkClusterControlStatus) *v1beta1.FlinkClusterControlStatus { var controlRequest = getNewControlRequest(cluster) // Derived control status to return var c *v1beta1.FlinkClusterControlStatus // New control status if controlStatusChanged(cluster, controlRequest) { c = getControlStatus(controlRequest, v1beta1.ControlStateRequested) return c } // Update control status in progress. if recordedControl != nil && recordedControl.State == v1beta1.ControlStateInProgress { c = recordedControl.DeepCopy() switch recordedControl.Name { case v1beta1.ControlNameJobCancel: switch { case newJob.State == v1beta1.JobStateCancelled: if newSavepoint != nil { if newSavepoint.State == v1beta1.SavepointStateSucceeded { c.State = v1beta1.ControlStateSucceeded } else if newSavepoint.IsFailed() && newSavepoint.TriggerReason == v1beta1.SavepointReasonJobCancel { c.Message = "Aborted job cancellation: failed to take savepoint." c.State = v1beta1.ControlStateFailed } } else { c.State = v1beta1.ControlStateSucceeded } case newJob.IsStopped(): c.Message = "Aborted job cancellation: job is stopped already." c.State = v1beta1.ControlStateFailed } case v1beta1.ControlNameSavepoint: if newSavepoint == nil { c.Message = "Aborted: savepoint not defined" c.State = v1beta1.ControlStateFailed } else if newSavepoint.State == v1beta1.SavepointStateSucceeded { c.State = v1beta1.ControlStateSucceeded } else if newSavepoint.IsFailed() && newSavepoint.TriggerReason == v1beta1.SavepointReasonUserRequested { c.State = v1beta1.ControlStateFailed } } // Update time when state changed. if c.State != v1beta1.ControlStateInProgress { util.SetTimestamp(&c.UpdateTime) } return c } // Maintain control status if there is no change. if recordedControl != nil && c == nil { c = recordedControl.DeepCopy() return c } return nil } func deriveRevisionStatus( updateState UpdateState, observedRevision *Revision, recordedRevision *v1beta1.RevisionStatus, ) v1beta1.RevisionStatus { // Derived revision status var r = v1beta1.RevisionStatus{} // Finalize update process. if updateState == UpdateStateFinished { r.CurrentRevision = recordedRevision.NextRevision } // Update revision status. r.NextRevision = util.GetRevisionWithNameNumber(observedRevision.nextRevision) if r.CurrentRevision == "" { if recordedRevision.CurrentRevision == "" { r.CurrentRevision = util.GetRevisionWithNameNumber(observedRevision.currentRevision) } else { r.CurrentRevision = recordedRevision.CurrentRevision } } if observedRevision.collisionCount != 0 { r.CollisionCount = new(int32) *r.CollisionCount = observedRevision.collisionCount } return r } func getStatefulSetState(statefulSet *appsv1.StatefulSet) v1beta1.ComponentState { if statefulSet.Status.ReadyReplicas >= *statefulSet.Spec.Replicas { return v1beta1.ComponentStateReady } return v1beta1.ComponentStateNotReady } func getDeploymentState(deployment *appsv1.Deployment) v1beta1.ComponentState { if deployment.Status.ReadyReplicas >= *deployment.Spec.Replicas { return v1beta1.ComponentStateReady } return v1beta1.ComponentStateNotReady }