controllers/flinkcluster/flinkcluster_reconciler.go (764 lines of code) (raw):

/* Copyright 2019 Google LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package flinkcluster import ( "context" "fmt" "reflect" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "github.com/go-logr/logr" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/batchscheduler" schedulerTypes "github.com/spotify/flink-on-k8s-operator/internal/batchscheduler/types" "github.com/spotify/flink-on-k8s-operator/internal/flink" "github.com/spotify/flink-on-k8s-operator/internal/model" "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) // ClusterReconciler takes actions to drive the observed state towards the // desired state. type ClusterReconciler struct { k8sClient client.Client flinkClient *flink.Client observed ObservedClusterState desired model.DesiredClusterState recorder record.EventRecorder } const JobCheckInterval = 10 * time.Second var requeueResult = ctrl.Result{RequeueAfter: JobCheckInterval, Requeue: true} // Compares the desired state and the observed state, if there is a difference, // takes actions to drive the observed state towards the desired state. func (reconciler *ClusterReconciler) reconcile(ctx context.Context) (ctrl.Result, error) { var err error log := logr.FromContextOrDiscard(ctx) // Child resources of the cluster CR will be automatically reclaimed by K8S. if reconciler.observed.cluster == nil { log.Info("The cluster has been deleted, no action to take") return ctrl.Result{}, nil } if shouldUpdateCluster(&reconciler.observed) { log.Info("The cluster update is in progress") } err = reconciler.reconcileBatchScheduler() if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileConfigMap(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileHAConfigMap(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcilePodDisruptionBudget(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileJobManagerStatefulSet(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileJobManagerService(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileJobManagerIngress(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileTaskManagerStatefulSet(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileTaskManagerDeployment(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileHorizontalPodAutoscaler(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcileTaskManagerService(ctx) if err != nil { return ctrl.Result{}, err } err = reconciler.reconcilePersistentVolumeClaims(ctx) if err != nil { return ctrl.Result{}, err } result, err := reconciler.reconcileJob(ctx) if err != nil { return ctrl.Result{}, err } return result, nil } func (reconciler *ClusterReconciler) reconcileBatchScheduler() error { cluster := reconciler.observed.cluster schedulerSpec := cluster.Spec.BatchScheduler if schedulerSpec == nil || schedulerSpec.Name == "" { return nil } scheduler, err := batchscheduler.GetScheduler(schedulerSpec.Name) if err != nil { return err } options := schedulerTypes.SchedulerOptions{ ClusterName: cluster.Name, ClusterNamespace: cluster.Namespace, Queue: schedulerSpec.Queue, PriorityClassName: schedulerSpec.PriorityClassName, OwnerReferences: []metav1.OwnerReference{ToOwnerReference(cluster)}, } err = scheduler.Schedule(options, &reconciler.desired) if err != nil { return err } return nil } func (reconciler *ClusterReconciler) reconcileJobManagerStatefulSet(ctx context.Context) error { return reconciler.reconcileComponent( ctx, "JobManager", reconciler.desired.JmStatefulSet, reconciler.observed.jmStatefulSet) } func (reconciler *ClusterReconciler) reconcileTaskManagerStatefulSet(ctx context.Context) error { return reconciler.reconcileComponent( ctx, "TaskManager", reconciler.desired.TmStatefulSet, reconciler.observed.tmStatefulSet) } func (reconciler *ClusterReconciler) reconcileTaskManagerDeployment(ctx context.Context) error { return reconciler.reconcileComponent( ctx, "TaskManager", reconciler.desired.TmDeployment, reconciler.observed.tmDeployment) } func (reconciler *ClusterReconciler) reconcileComponent( ctx context.Context, component string, desiredObj client.Object, observedObj client.Object) error { log := logr.FromContextOrDiscard(ctx).WithValues("component", component) desiredObjIsNil := reflect.ValueOf(desiredObj).IsNil() observedObjIsNil := reflect.ValueOf(observedObj).IsNil() if !desiredObjIsNil && observedObjIsNil { return reconciler.createComponent(ctx, desiredObj, component) } if !desiredObjIsNil && !observedObjIsNil { var cluster = reconciler.observed.cluster if shouldUpdateCluster(&reconciler.observed) && !isComponentUpdated(observedObj, cluster) { var err error if shouldRecreateOnUpdate(&reconciler.observed) { err = reconciler.deleteComponent(ctx, desiredObj, component) } else { err = reconciler.updateComponent(ctx, desiredObj, component) } if err != nil { return err } return nil } log.Info("Component already exists, no action", "component", component) return nil } if desiredObjIsNil && !observedObjIsNil { return reconciler.deleteComponent(ctx, observedObj, component) } return nil } func (reconciler *ClusterReconciler) reconcileHorizontalPodAutoscaler(ctx context.Context) error { return reconciler.reconcileComponent( ctx, "HorizontalPodAutoscaler", reconciler.desired.HorizontalPodAutoscaler, reconciler.observed.horizontalPodAutoscaler) } func (reconciler *ClusterReconciler) reconcileTaskManagerService(ctx context.Context) error { var desiredTmService = reconciler.desired.TmService var observedTmService = reconciler.observed.tmService if desiredTmService != nil && observedTmService != nil { // v1.Service API does not handle update correctly when below values are empty. desiredTmService.SetResourceVersion(observedTmService.GetResourceVersion()) desiredTmService.Spec.ClusterIP = observedTmService.Spec.ClusterIP } return reconciler.reconcileComponent(ctx, "TaskManagerService", desiredTmService, observedTmService) } func (reconciler *ClusterReconciler) createComponent( ctx context.Context, obj client.Object, component string) error { log := logr.FromContextOrDiscard(ctx). WithValues("component", component). WithValues("object", obj) if err := reconciler.k8sClient.Create(ctx, obj); err != nil { log.Error(err, "Failed to create") return err } log.Info("Created") return nil } func (reconciler *ClusterReconciler) updateComponent(ctx context.Context, desired client.Object, component string) error { log := logr.FromContextOrDiscard(ctx). WithValues("component", component). WithValues("object", desired) var k8sClient = reconciler.k8sClient if err := k8sClient.Update(ctx, desired); err != nil { log.Error(err, "Failed to update component for update") return err } log.Info("Updated") return nil } func (reconciler *ClusterReconciler) deleteComponent( ctx context.Context, obj client.Object, component string) error { log := logr.FromContextOrDiscard(ctx). WithValues("component", component). WithValues("object", obj) var k8sClient = reconciler.k8sClient var err = k8sClient.Delete(ctx, obj) if client.IgnoreNotFound(err) != nil { log.Error(err, "Failed to delete", component, obj) } log.Info("Deleted") return nil } func (reconciler *ClusterReconciler) reconcileJobManagerService(ctx context.Context) error { var desiredJmService = reconciler.desired.JmService var observedJmService = reconciler.observed.jmService if desiredJmService != nil && observedJmService != nil { // v1.Service API does not handle update correctly when below values are empty. desiredJmService.SetResourceVersion(observedJmService.GetResourceVersion()) desiredJmService.Spec.ClusterIP = observedJmService.Spec.ClusterIP } return reconciler.reconcileComponent(ctx, "JobManagerService", desiredJmService, observedJmService) } func (reconciler *ClusterReconciler) reconcileJobManagerIngress(ctx context.Context) error { var desiredJmIngress = reconciler.desired.JmIngress var observedJmIngress = reconciler.observed.jmIngress return reconciler.reconcileComponent(ctx, "JobManagerIngress", desiredJmIngress, observedJmIngress) } func (reconciler *ClusterReconciler) reconcileConfigMap(ctx context.Context) error { var desiredConfigMap = reconciler.desired.ConfigMap var observedConfigMap = reconciler.observed.configMap return reconciler.reconcileComponent(ctx, "ConfigMap", desiredConfigMap, observedConfigMap) } // Set the owner reference of the cluster to the HA ConfigMap (if it doesn't already have one) func (reconciler *ClusterReconciler) reconcileHAConfigMap(ctx context.Context) error { var observedHAConfigMap = reconciler.observed.haConfigMap if observedHAConfigMap == nil { return nil } if observedHAConfigMap.OwnerReferences == nil || len(observedHAConfigMap.OwnerReferences) == 0 { observedHAConfigMap.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(reconciler.observed.cluster)} err := reconciler.updateComponent(ctx, observedHAConfigMap, "HA ConfigMap") if err != nil { return err } } return nil } func (reconciler *ClusterReconciler) reconcilePodDisruptionBudget(ctx context.Context) error { desiredPodDisruptionBudget := reconciler.desired.PodDisruptionBudget observedPodDisruptionBudget := reconciler.observed.podDisruptionBudget if desiredPodDisruptionBudget != nil && observedPodDisruptionBudget != nil { // When updating a PodDisruptionBudget, the resource version must be set! // Setting the resource version to the observed resource version ensures that the update is not rejected desiredPodDisruptionBudget.SetResourceVersion(observedPodDisruptionBudget.ResourceVersion) } return reconciler.reconcileComponent( ctx, "PodDisruptionBudget", desiredPodDisruptionBudget, observedPodDisruptionBudget) } func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaims(ctx context.Context) error { observed := reconciler.observed pvcs := observed.persistentVolumeClaims jm := observed.jmStatefulSet tm := observed.tmStatefulSet for _, pvc := range pvcs.Items { if c, ok := pvc.Labels["component"]; ok && c == "jobmanager" && jm != nil { reconciler.reconcilePersistentVolumeClaim(ctx, &pvc, jm) } if c, ok := pvc.Labels["component"]; ok && c == "taskmanager" && tm != nil { reconciler.reconcilePersistentVolumeClaim(ctx, &pvc, tm) } } return nil } func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaim(ctx context.Context, pvc *corev1.PersistentVolumeClaim, sset *appsv1.StatefulSet) error { log := logr.FromContextOrDiscard(ctx) k8sClient := reconciler.k8sClient for _, ownerRef := range pvc.GetOwnerReferences() { if ownerRef.Kind == sset.Kind { return nil } } patch := fmt.Sprintf( `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":false}],"uid":"%s"}}`, sset.APIVersion, sset.Kind, sset.GetName(), sset.GetUID(), pvc.GetUID(), ) err := k8sClient.Patch(ctx, pvc, client.RawPatch(types.MergePatchType, []byte(patch))) if err != nil { log.Error(err, "Failed to update PersistentVolumeClaim") } else { log.Info("PersistentVolumeClaim patched") } return err } func (reconciler *ClusterReconciler) reconcileJob(ctx context.Context) (ctrl.Result, error) { log := logr.FromContextOrDiscard(ctx) var desiredJob = reconciler.desired.Job var observed = reconciler.observed var recorded = observed.cluster.Status var jobSpec = observed.cluster.Spec.Job var job = recorded.Components.Job var err error var jobID = reconciler.getFlinkJobID() // Update status changed via job reconciliation. var newSavepointStatus *v1beta1.SavepointStatus var newControlStatus *v1beta1.FlinkClusterControlStatus defer reconciler.updateStatus(ctx, &newSavepointStatus, &newControlStatus) observedSubmitter := observed.flinkJobSubmitter.job if desiredJob != nil && job.IsTerminated(jobSpec) { return ctrl.Result{}, nil } if wasJobCancelRequested(observed.cluster.Status.Control) { log.Info("Force tearing down the job") userControl := getNewControlRequest(observed.cluster) if userControl == v1beta1.ControlNameJobCancel { newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) } // cancel all running jobs if job.IsActive() { if err := reconciler.cancelRunningJobs(ctx, true /* takeSavepoint */); err != nil && !errors.IsResourceExpired(err) { return requeueResult, err } } // kill job submitter pod if observedSubmitter != nil { if err := reconciler.deleteJob(ctx, observedSubmitter); err != nil { return requeueResult, err } } } // Create new Flink job submitter when starting new job, updating job or restarting job in failure. if desiredJob != nil && !job.IsActive() { log.Info("Deploying Flink job") // TODO: Record event or introduce Condition in CRD status to notify update state pended. // https://github.com/kubernetes/apimachinery/blob/57f2a0733447cfd41294477d833cce6580faaca3/pkg/apis/meta/v1/types.go#L1376 var unexpectedJobs = observed.flinkJob.unexpected if len(unexpectedJobs) > 0 { // This is an exceptional situation. // There should be no jobs because all jobs are terminated in the previous iterations. // In this case user should identify the problem so that the job is not executed multiple times unintentionally // cause of Flink error, Flink operator error or other unknown error. // If user want to proceed, unexpected jobs should be terminated. log.Error(errors.NewInternalError(fmt.Errorf("unexpected jobs found")), "Failed to create job submitter", "unexpected jobs", unexpectedJobs) return ctrl.Result{}, nil } // Create Flink job submitter log.Info("Updating job status to proceed creating new job submitter") // Job status must be updated before creating a job submitter to ensure the observed job is the job submitted by the operator. err = reconciler.updateJobDeployStatus(ctx) if err != nil { log.Info("Failed to update the job status for job submission") return requeueResult, err } cr := getCurrentRevisionName(&observed.cluster.Status.Revision) if observedSubmitter != nil { if observedSubmitter.Labels[RevisionNameLabel] == cr { log.Info("Found old job submitter") err = reconciler.deleteJob(ctx, observedSubmitter) if err != nil { return requeueResult, err } } else if observedSubmitter.Status.Failed >= 1 { log.Info("Found failed job submitter") err = reconciler.deleteJob(ctx, observedSubmitter) if err != nil { return requeueResult, err } } else { log.Info("Found job submitter, wait for it to be active or failed") return requeueResult, nil } } else { err = reconciler.createJob(ctx, desiredJob) } return requeueResult, err } if desiredJob != nil && job.IsActive() { if job.State == v1beta1.JobStateDeploying { log.Info("Job submitter is deployed, wait until completed") return requeueResult, nil } // Suspend or stop job to proceed update. if recorded.Revision.IsUpdateTriggered() && !isScaleUpdate(observed.revisions, observed.cluster) { log.Info("Preparing job update") var takeSavepoint = jobSpec.TakeSavepointOnUpdate == nil || *jobSpec.TakeSavepointOnUpdate var shouldSuspend = takeSavepoint && util.IsBlank(jobSpec.FromSavepoint) if shouldSuspend { newSavepointStatus, err = reconciler.trySuspendJob(ctx) } else if shouldUpdateJob(&observed) { err = reconciler.cancelJob(ctx) } return requeueResult, err } // Trigger savepoint if required. if len(jobID) > 0 { var savepointReason = reconciler.shouldTakeSavepoint() if savepointReason != "" { newSavepointStatus, err = reconciler.triggerSavepoint(ctx, jobID, savepointReason, false) } // Get new control status when the savepoint reason matches the requested control. var userControl = getNewControlRequest(observed.cluster) if userControl == v1beta1.ControlNameSavepoint && savepointReason == v1beta1.SavepointReasonUserRequested { newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) } return requeueResult, err } log.Info("Job is not finished yet, no action", "jobID", jobID) return requeueResult, nil } // Job finished. Stop Flink job and kill job-submitter. if desiredJob == nil && (!job.IsStopped() || observedSubmitter != nil) { if job.IsActive() { userControl := getNewControlRequest(observed.cluster) if userControl == v1beta1.ControlNameJobCancel { newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) } log.Info("Stopping job", "jobID", jobID) if err := reconciler.cancelRunningJobs(ctx, true /* takeSavepoint */); err != nil { return requeueResult, err } } else if job.IsStopped() && observedSubmitter != nil { if observed.cluster.Status.Components.Job.SubmitterExitCode == -1 { log.Info("Job submitter has not finished yet") return requeueResult, err } if err := reconciler.deleteJob(ctx, observedSubmitter); err != nil { return requeueResult, err } } // to make sure the job is stopped return requeueResult, nil } if job.IsStopped() { log.Info("Job has finished, no action") } return ctrl.Result{}, nil } func (reconciler *ClusterReconciler) createJob(ctx context.Context, job *batchv1.Job) error { log := logr.FromContextOrDiscard(ctx) var k8sClient = reconciler.k8sClient log.Info("Creating job submitter", "resource", *job) var err = k8sClient.Create(ctx, job) if err != nil { log.Info("Failed to created job submitter", "error", err) } else { log.Info("Job submitter created") } return err } func (reconciler *ClusterReconciler) deleteJob(ctx context.Context, job *batchv1.Job) error { log := logr.FromContextOrDiscard(ctx) var k8sClient = reconciler.k8sClient var deletePolicy = metav1.DeletePropagationBackground var deleteOption = client.DeleteOptions{PropagationPolicy: &deletePolicy} log.Info("Deleting job submitter", "job", job) var err = k8sClient.Delete(ctx, job, &deleteOption) err = client.IgnoreNotFound(err) if err != nil { log.Error(err, "Failed to delete job submitter") } else { log.Info("Job submitter deleted") } return err } func (reconciler *ClusterReconciler) getFlinkJobID() string { var jobStatus = reconciler.observed.cluster.Status.Components.Job if jobStatus != nil && len(jobStatus.ID) > 0 { return jobStatus.ID } return "" } func (reconciler *ClusterReconciler) trySuspendJob(ctx context.Context) (*v1beta1.SavepointStatus, error) { log := logr.FromContextOrDiscard(ctx) var recorded = reconciler.observed.cluster.Status if !canTakeSavepoint(reconciler.observed.cluster) { return nil, nil } var jobID = reconciler.getFlinkJobID() log.Info("Checking the conditions for progressing") var canSuspend = reconciler.canSuspendJob(ctx, jobID, recorded.Savepoint) if canSuspend { log.Info("Triggering savepoint for suspending job") var newSavepointStatus, err = reconciler.triggerSavepoint(ctx, jobID, v1beta1.SavepointReasonUpdate, true) if err != nil { log.Info("Failed to trigger savepoint", "jobID", jobID, "triggerID", newSavepointStatus.TriggerID, "error", err) } else { log.Info("Successfully savepoint triggered", "jobID", jobID, "triggerID", newSavepointStatus.TriggerID) } return newSavepointStatus, err } return nil, nil } func (reconciler *ClusterReconciler) cancelJob(ctx context.Context) error { log := logr.FromContextOrDiscard(ctx) var observedFlinkJob = reconciler.observed.flinkJob.status log.Info("Stopping Flink job", "", observedFlinkJob) var err = reconciler.cancelRunningJobs(ctx, false /* takeSavepoint */) if err != nil { log.Info("Failed to stop Flink job") return err } // TODO: Not to delete the job submitter immediately, and retain the latest ones for inspection. var observedSubmitter = reconciler.observed.flinkJobSubmitter.job if observedSubmitter != nil { var err = reconciler.deleteJob(ctx, observedSubmitter) if err != nil { log.Error( err, "Failed to delete job submitter", "job", observedSubmitter) return err } } return nil } func (reconciler *ClusterReconciler) cancelUnexpectedJobs( ctx context.Context, takeSavepoint bool) error { var unexpectedJobs = reconciler.observed.flinkJob.unexpected return reconciler.cancelJobs(ctx, takeSavepoint, unexpectedJobs) } // Cancel running jobs. func (reconciler *ClusterReconciler) cancelRunningJobs( ctx context.Context, takeSavepoint bool) error { var runningJobs = reconciler.observed.flinkJob.unexpected var flinkJob = reconciler.observed.flinkJob.status if flinkJob != nil && flinkJob.Id != "" && getFlinkJobDeploymentState(flinkJob.State) == v1beta1.JobStateRunning { runningJobs = append(runningJobs, flinkJob.Id) } if len(runningJobs) == 0 { return errors.NewResourceExpired("no running Flink jobs to stop") } return reconciler.cancelJobs(ctx, takeSavepoint, runningJobs) } // Cancel jobs. func (reconciler *ClusterReconciler) cancelJobs( ctx context.Context, takeSavepoint bool, jobs []string) error { log := logr.FromContextOrDiscard(ctx) for _, jobID := range jobs { log.Info("Cancel running job", "jobID", jobID) var err = reconciler.cancelFlinkJob(ctx, jobID, takeSavepoint) if err != nil { log.Error(err, "Failed to cancel running job", "jobID", jobID) return err } } return nil } // Takes a savepoint if possible then stops the job. func (reconciler *ClusterReconciler) cancelFlinkJob(ctx context.Context, jobID string, takeSavepoint bool) error { log := logr.FromContextOrDiscard(ctx) if takeSavepoint && canTakeSavepoint(reconciler.observed.cluster) { log.Info("Taking savepoint before stopping job", "jobID", jobID) var err = reconciler.takeSavepoint(ctx, jobID) if err != nil { return err } } var apiBaseURL = getFlinkAPIBaseURL(reconciler.observed.cluster) log.Info("Stoping job", "jobID", jobID) return reconciler.flinkClient.StopJob(apiBaseURL, jobID) } // canSuspendJob func (reconciler *ClusterReconciler) canSuspendJob(ctx context.Context, jobID string, s *v1beta1.SavepointStatus) bool { log := logr.FromContextOrDiscard(ctx) var firstTry = !finalSavepointRequested(jobID, s) if firstTry { return true } switch s.State { case v1beta1.SavepointStateSucceeded: log.Info("Successfully savepoint completed, wait until the job stops") return true case v1beta1.SavepointStateInProgress: log.Info("Savepoint is in progress, wait until it is completed") return false case v1beta1.SavepointStateTriggerFailed: log.Info("Savepoint trigger failed in previous request") case v1beta1.SavepointStateFailed: log.Info("Savepoint failed on previous request") } var retryTimeArrived = hasTimeElapsed(s.UpdateTime, time.Now(), SavepointRetryIntervalSeconds) if !retryTimeArrived { log.Info("Wait until next retry time arrived") } return retryTimeArrived } func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReason { var observed = reconciler.observed var cluster = observed.cluster var jobSpec = observed.cluster.Spec.Job var job = observed.cluster.Status.Components.Job var savepoint = observed.cluster.Status.Savepoint var newRequestedControl = getNewControlRequest(cluster) if !canTakeSavepoint(reconciler.observed.cluster) { return "" } // Savepoint trigger priority is user request including update and job stop. switch { // TODO: spec.job.cancelRequested will be deprecated // Should stop job with savepoint by user requested control case newRequestedControl == v1beta1.ControlNameJobCancel || (jobSpec.CancelRequested != nil && *jobSpec.CancelRequested): return v1beta1.SavepointReasonJobCancel // Take savepoint by user request case newRequestedControl == v1beta1.ControlNameSavepoint: fallthrough // TODO: spec.job.savepointGeneration will be deprecated case jobSpec.SavepointGeneration > job.SavepointGeneration: // Triggered by savepointGeneration increased. // When previous savepoint is failed, savepoint trigger by spec.job.savepointGeneration is not possible // because the field cannot be increased more. // Note: checkSavepointGeneration in flinkcluster_validate.go return v1beta1.SavepointReasonUserRequested // Scheduled auto savepoint case jobSpec.AutoSavepointSeconds != nil: // When previous try was failed, check retry interval. if savepoint.IsFailed() && savepoint.TriggerReason == v1beta1.SavepointReasonScheduled { var nextRetryTime = util.GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second) if time.Now().After(nextRetryTime) { return v1beta1.SavepointReasonScheduled } else { return "" } } // Check if next trigger time arrived. var compareTime string if len(job.SavepointTime) == 0 { compareTime = job.StartTime } else { compareTime = job.SavepointTime } var nextTime = getTimeAfterAddedSeconds(compareTime, int64(*jobSpec.AutoSavepointSeconds)) if time.Now().After(nextTime) { return v1beta1.SavepointReasonScheduled } } return "" } // Trigger savepoint for a job then return savepoint status to update. func (reconciler *ClusterReconciler) triggerSavepoint( ctx context.Context, jobID string, triggerReason v1beta1.SavepointReason, cancel bool) (*v1beta1.SavepointStatus, error) { log := logr.FromContextOrDiscard(ctx) var cluster = reconciler.observed.cluster var apiBaseURL = getFlinkAPIBaseURL(reconciler.observed.cluster) var triggerSuccess bool var savepointTriggerID *flink.SavepointTriggerID var triggerID string var message string var err error log.Info(fmt.Sprintf("Trigger savepoint for %s", triggerReason), "jobID", jobID) savepointTriggerID, err = reconciler.flinkClient.TriggerSavepoint(apiBaseURL, jobID, *cluster.Spec.Job.SavepointsDir, cancel) if err != nil { // limit message size to 1KiB if message = err.Error(); len(message) > 1024 { message = message[:1024] + "..." } triggerSuccess = false log.Info("Failed to trigger savepoint", "jobID", jobID, "triggerID", triggerID, "error", err) } else { triggerSuccess = true triggerID = savepointTriggerID.RequestID log.Info("Successfully savepoint triggered", "jobID", jobID, "triggerID", triggerID) } newSavepointStatus := reconciler.getNewSavepointStatus(triggerID, triggerReason, message, triggerSuccess) return newSavepointStatus, err } // Takes savepoint for a job then update job status with the info. func (reconciler *ClusterReconciler) takeSavepoint(ctx context.Context, jobID string) error { log := logr.FromContextOrDiscard(ctx) apiBaseURL := getFlinkAPIBaseURL(reconciler.observed.cluster) log.Info("Taking savepoint.", "jobID", jobID) status, err := reconciler.flinkClient.TakeSavepoint(apiBaseURL, jobID, *reconciler.observed.cluster.Spec.Job.SavepointsDir) log.Info("Savepoint status.", "status", status, "error", err) if err == nil && len(status.FailureCause.StackTrace) > 0 { err = fmt.Errorf("%s", status.FailureCause.StackTrace) } if err != nil || !status.Completed { log.Info("Failed to take savepoint.", "jobID", jobID) } return err } func (reconciler *ClusterReconciler) updateStatus( ctx context.Context, ss **v1beta1.SavepointStatus, cs **v1beta1.FlinkClusterControlStatus) { log := logr.FromContextOrDiscard(ctx) var savepointStatus = *ss var controlStatus = *cs if savepointStatus == nil && controlStatus == nil { return } // Record events if savepointStatus != nil { eventType, eventReason, eventMessage := getSavepointEvent(*savepointStatus) reconciler.recorder.Event(reconciler.observed.cluster, eventType, eventReason, eventMessage) } if controlStatus != nil { eventType, eventReason, eventMessage := getControlEvent(*controlStatus) reconciler.recorder.Event(reconciler.observed.cluster, eventType, eventReason, eventMessage) } // Update status var clusterClone = reconciler.observed.cluster.DeepCopy() var statusUpdateErr error retry.RetryOnConflict(retry.DefaultBackoff, func() error { var newStatus = &clusterClone.Status if savepointStatus != nil { newStatus.Savepoint = savepointStatus } if controlStatus != nil { newStatus.Control = controlStatus } util.SetTimestamp(&newStatus.LastUpdateTime) log.Info("Updating cluster status", "clusterClone", clusterClone, "newStatus", newStatus) statusUpdateErr = reconciler.k8sClient.Status().Update(ctx, clusterClone) if statusUpdateErr == nil { return nil } var clusterUpdated v1beta1.FlinkCluster if err := reconciler.k8sClient.Get( ctx, types.NamespacedName{Namespace: clusterClone.Namespace, Name: clusterClone.Name}, &clusterUpdated); err == nil { clusterClone = clusterUpdated.DeepCopy() } return statusUpdateErr }) if statusUpdateErr != nil { log.Error( statusUpdateErr, "Failed to update status.", "error", statusUpdateErr) } } func (reconciler *ClusterReconciler) updateJobDeployStatus(ctx context.Context) error { var log = logr.FromContextOrDiscard(ctx) var observedCluster = reconciler.observed.cluster var desiredJobSubmitter = reconciler.desired.Job var err error var clusterClone = observedCluster.DeepCopy() var newJob = clusterClone.Status.Components.Job // Reset running job information. // newJob.ID = "" newJob.StartTime = "" newJob.CompletionTime = nil // Mark as job submitter is deployed. util.SetTimestamp(&newJob.DeployTime) util.SetTimestamp(&clusterClone.Status.LastUpdateTime) // Latest savepoint location should be fromSavepoint. var fromSavepoint = getFromSavepoint(desiredJobSubmitter.Spec) newJob.FromSavepoint = fromSavepoint if newJob.SavepointLocation != "" { newJob.SavepointLocation = fromSavepoint } // Update job status. err = reconciler.k8sClient.Status().Update(ctx, clusterClone) if err != nil { log.Error( err, "Failed to update job status for new job submitter", "error", err) } else { log.Info("Succeeded to update job status for new job submitter.", "job status", newJob) } return err } // getNewSavepointStatus returns newly triggered savepoint status. func (reconciler *ClusterReconciler) getNewSavepointStatus(triggerID string, triggerReason v1beta1.SavepointReason, message string, triggerSuccess bool) *v1beta1.SavepointStatus { var jobID = reconciler.getFlinkJobID() var savepointState string var now string util.SetTimestamp(&now) if triggerSuccess { savepointState = v1beta1.SavepointStateInProgress } else { savepointState = v1beta1.SavepointStateTriggerFailed } var savepointStatus = &v1beta1.SavepointStatus{ JobID: jobID, TriggerID: triggerID, TriggerReason: triggerReason, TriggerTime: now, UpdateTime: now, Message: message, State: savepointState, } return savepointStatus } // Convert raw time to object and add `addedSeconds` to it, // getting a time object for the parsed `rawTime` with `addedSeconds` added to it. func getTimeAfterAddedSeconds(rawTime string, addedSeconds int64) time.Time { var tc = &util.TimeConverter{} var lastTriggerTime = time.Time{} if len(rawTime) != 0 { lastTriggerTime = tc.FromString(rawTime) } return lastTriggerTime.Add(time.Duration(addedSeconds * int64(time.Second))) }