func()

in controllers/flinkcluster/flinkcluster_reconciler.go [406:566]


func (reconciler *ClusterReconciler) reconcileJob(ctx context.Context) (ctrl.Result, error) {
	log := logr.FromContextOrDiscard(ctx)
	var desiredJob = reconciler.desired.Job
	var observed = reconciler.observed
	var recorded = observed.cluster.Status
	var jobSpec = observed.cluster.Spec.Job
	var job = recorded.Components.Job
	var err error
	var jobID = reconciler.getFlinkJobID()

	// Update status changed via job reconciliation.
	var newSavepointStatus *v1beta1.SavepointStatus
	var newControlStatus *v1beta1.FlinkClusterControlStatus
	defer reconciler.updateStatus(ctx, &newSavepointStatus, &newControlStatus)

	observedSubmitter := observed.flinkJobSubmitter.job

	if desiredJob != nil && job.IsTerminated(jobSpec) {
		return ctrl.Result{}, nil
	}

	if wasJobCancelRequested(observed.cluster.Status.Control) {
		log.Info("Force tearing down the job")
		userControl := getNewControlRequest(observed.cluster)
		if userControl == v1beta1.ControlNameJobCancel {
			newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
		}
		// cancel all running jobs
		if job.IsActive() {
			if err := reconciler.cancelRunningJobs(ctx, true /* takeSavepoint */); err != nil && !errors.IsResourceExpired(err) {
				return requeueResult, err
			}
		}
		// kill job submitter pod
		if observedSubmitter != nil {
			if err := reconciler.deleteJob(ctx, observedSubmitter); err != nil {
				return requeueResult, err
			}
		}
	}

	// Create new Flink job submitter when starting new job, updating job or restarting job in failure.
	if desiredJob != nil && !job.IsActive() {
		log.Info("Deploying Flink job")

		// TODO: Record event or introduce Condition in CRD status to notify update state pended.
		// https://github.com/kubernetes/apimachinery/blob/57f2a0733447cfd41294477d833cce6580faaca3/pkg/apis/meta/v1/types.go#L1376
		var unexpectedJobs = observed.flinkJob.unexpected
		if len(unexpectedJobs) > 0 {
			// This is an exceptional situation.
			// There should be no jobs because all jobs are terminated in the previous iterations.
			// In this case user should identify the problem so that the job is not executed multiple times unintentionally
			// cause of Flink error, Flink operator error or other unknown error.
			// If user want to proceed, unexpected jobs should be terminated.
			log.Error(errors.NewInternalError(fmt.Errorf("unexpected jobs found")), "Failed to create job submitter", "unexpected jobs", unexpectedJobs)
			return ctrl.Result{}, nil
		}

		// Create Flink job submitter
		log.Info("Updating job status to proceed creating new job submitter")
		// Job status must be updated before creating a job submitter to ensure the observed job is the job submitted by the operator.
		err = reconciler.updateJobDeployStatus(ctx)
		if err != nil {
			log.Info("Failed to update the job status for job submission")
			return requeueResult, err
		}

		cr := getCurrentRevisionName(&observed.cluster.Status.Revision)
		if observedSubmitter != nil {
			if observedSubmitter.Labels[RevisionNameLabel] == cr {
				log.Info("Found old job submitter")
				err = reconciler.deleteJob(ctx, observedSubmitter)
				if err != nil {
					return requeueResult, err
				}
			} else if observedSubmitter.Status.Failed >= 1 {
				log.Info("Found failed job submitter")
				err = reconciler.deleteJob(ctx, observedSubmitter)
				if err != nil {
					return requeueResult, err
				}
			} else {
				log.Info("Found job submitter, wait for it to be active or failed")
				return requeueResult, nil
			}
		} else {
			err = reconciler.createJob(ctx, desiredJob)
		}

		return requeueResult, err
	}

	if desiredJob != nil && job.IsActive() {
		if job.State == v1beta1.JobStateDeploying {
			log.Info("Job submitter is deployed, wait until completed")
			return requeueResult, nil
		}

		// Suspend or stop job to proceed update.
		if recorded.Revision.IsUpdateTriggered() && !isScaleUpdate(observed.revisions, observed.cluster) {
			log.Info("Preparing job update")
			var takeSavepoint = jobSpec.TakeSavepointOnUpdate == nil || *jobSpec.TakeSavepointOnUpdate
			var shouldSuspend = takeSavepoint && util.IsBlank(jobSpec.FromSavepoint)
			if shouldSuspend {
				newSavepointStatus, err = reconciler.trySuspendJob(ctx)
			} else if shouldUpdateJob(&observed) {
				err = reconciler.cancelJob(ctx)
			}
			return requeueResult, err
		}

		// Trigger savepoint if required.
		if len(jobID) > 0 {
			var savepointReason = reconciler.shouldTakeSavepoint()
			if savepointReason != "" {
				newSavepointStatus, err = reconciler.triggerSavepoint(ctx, jobID, savepointReason, false)
			}
			// Get new control status when the savepoint reason matches the requested control.
			var userControl = getNewControlRequest(observed.cluster)
			if userControl == v1beta1.ControlNameSavepoint && savepointReason == v1beta1.SavepointReasonUserRequested {
				newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
			}
			return requeueResult, err
		}

		log.Info("Job is not finished yet, no action", "jobID", jobID)
		return requeueResult, nil
	}

	// Job finished. Stop Flink job and kill job-submitter.
	if desiredJob == nil && (!job.IsStopped() || observedSubmitter != nil) {
		if job.IsActive() {
			userControl := getNewControlRequest(observed.cluster)
			if userControl == v1beta1.ControlNameJobCancel {
				newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
			}

			log.Info("Stopping job", "jobID", jobID)
			if err := reconciler.cancelRunningJobs(ctx, true /* takeSavepoint */); err != nil {
				return requeueResult, err
			}
		} else if job.IsStopped() && observedSubmitter != nil {
			if observed.cluster.Status.Components.Job.SubmitterExitCode == -1 {
				log.Info("Job submitter has not finished yet")
				return requeueResult, err
			}
			if err := reconciler.deleteJob(ctx, observedSubmitter); err != nil {
				return requeueResult, err
			}
		}

		// to make sure the job is stopped
		return requeueResult, nil
	}

	if job.IsStopped() {
		log.Info("Job has finished, no action")
	}

	return ctrl.Result{}, nil
}