func()

in controllers/flinkcluster/flinkcluster_updater.go [623:801]


func (updater *ClusterStatusUpdater) deriveJobStatus(ctx context.Context) *v1beta1.JobStatus {
	log := logr.FromContextOrDiscard(ctx)

	var observed = updater.observed
	var observedCluster = observed.cluster
	var jobSpec = observedCluster.Spec.Job
	if jobSpec == nil {
		return nil
	}

	var observedSubmitter = observed.flinkJobSubmitter
	var observedFlinkJob = observed.flinkJob.status
	var observedSavepoint = observed.savepoint
	var recorded = observedCluster.Status
	var savepoint = recorded.Savepoint
	var oldJob = recorded.Components.Job
	var newJob *v1beta1.JobStatus

	// Derive new job state.
	if oldJob != nil {
		newJob = oldJob.DeepCopy()
	} else {
		newJob = new(v1beta1.JobStatus)
	}

	if observedSubmitter.job != nil {
		newJob.SubmitterName = observedSubmitter.job.Name
		exitCode, _ := updater.deriveJobSubmitterExitCodeAndReason(observed.flinkJobSubmitter.pod, observed.flinkJobSubmitter.job)
		newJob.SubmitterExitCode = exitCode
	} else if observedSubmitter.job == nil || observed.flinkJobSubmitter.pod == nil {
		// Submitter is nil, so the submitter exit code shouldn't be "running"
		if oldJob != nil && oldJob.SubmitterExitCode == -1 {
			newJob.SubmitterExitCode = 0
		}
	}

	var newJobState v1beta1.JobState
	switch {
	case oldJob == nil:
		newJobState = v1beta1.JobStatePending
	case shouldUpdateJob(&observed):
		newJobState = v1beta1.JobStateUpdating
	case oldJob.IsStopped():
		// When a new job is deploying, update the job state to deploying.
		if observedSubmitter.job != nil && (observedSubmitter.job.Status.Active == 1 || isJobInitialising(observedSubmitter.job.Status)) {
			newJobState = v1beta1.JobStateDeploying
		} else {
			newJobState = oldJob.State
		}
	case oldJob.IsPending() && oldJob.DeployTime != "":
		newJobState = v1beta1.JobStateDeploying
	// Derive the job state from the observed Flink job, if it exists.
	case observedFlinkJob != nil:
		if observedFlinkJob.Id != "" {
			newJob.ID = observedFlinkJob.Id
		}
		if observedFlinkJob.Name != "" {
			newJob.Name = observedFlinkJob.Name
		}
		tmpState := getFlinkJobDeploymentState(observedFlinkJob.State)
		if observedSubmitter.job == nil ||
			(observedSubmitter.job.Status.Failed < 1 && tmpState != v1beta1.JobStateSucceeded) {
			newJobState = tmpState
			break
		}
		log.Info("The submitter maybe still running. Waiting for it")
		fallthrough
	case oldJob.IsActive() && observedSubmitter.job != nil && observedSubmitter.job.Status.Active == 0:
		if observedSubmitter.job.Status.Succeeded == 1 {
			newJobState = v1beta1.JobStateSucceeded
			if newJob.SubmitterExitCode == -1 {
				log.Info("Job succeeded but the exit code is -1. This is an edge case that may " +
					"happen if the controller is down or busy for a long time and the submitter pod is deleted externally " +
					"including by kube-system:pod-garbage-collector. Changing exit code to 0.")
				newJob.SubmitterExitCode = 0
			}
		} else if observedSubmitter.job.Status.Failed == 1 {
			newJobState = v1beta1.JobStateFailed
		} else {
			newJobState = oldJob.State
		}
	case shouldStopJob(observedCluster):
		newJobState = v1beta1.JobStateCancelled
	// When Flink job not found in JobManager or JobManager is unavailable
	case isFlinkAPIReady(observed.flinkJob.list):
		if oldJob.State == v1beta1.JobStateRunning {
			newJobState = v1beta1.JobStateLost
			break
		}
		fallthrough
	default:
		// Maintain the job state as recorded if job is not being deployed.
		if oldJob.State != v1beta1.JobStateDeploying {
			newJobState = oldJob.State
			break
		}
		// Job must be in deployment but the submitter not found or tracking failed.
		var jobDeployState = observedSubmitter.getState()
		if observedSubmitter.job == nil || jobDeployState == JobDeployStateUnknown {
			newJobState = v1beta1.JobStateLost
			break
		}
		// Case in which the job submission clearly fails even if it is not confirmed by JobManager
		// Job submitter is deployed but failed.
		if jobDeployState == JobDeployStateFailed {
			newJobState = v1beta1.JobStateDeployFailed
			break
		}

		newJobState = oldJob.State
	}
	// Update State
	newJob.State = newJobState

	// Derived new job status if the state is changed.
	if oldJob == nil || oldJob.State != newJob.State {
		// TODO: It would be ideal to set the times with the timestamp retrieved from the Flink API like /jobs/{job-id}.
		switch {
		case newJob.IsPending():
			newJob.DeployTime = ""
			switch newJob.State {
			case v1beta1.JobStateUpdating:
				newJob.RestartCount = 0
			case v1beta1.JobStateRestarting:
				newJob.RestartCount++
			}
		case newJob.State == v1beta1.JobStateRunning:
			util.SetTimestamp(&newJob.StartTime)
			newJob.CompletionTime = nil
			// When job started, the savepoint is not the final state of the job any more.
			if oldJob.FinalSavepoint {
				newJob.FinalSavepoint = false
			}
		case newJob.IsFailed():
			if len(newJob.FailureReasons) == 0 {
				newJob.FailureReasons = []string{}
				exceptions := observed.flinkJob.exceptions
				if exceptions != nil && len(exceptions.Exceptions) > 0 {
					for _, e := range exceptions.Exceptions {
						newJob.FailureReasons = append(newJob.FailureReasons, e.Exception)
					}
				} else if observedSubmitter.log != nil {
					newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message)
				}
			}
			fallthrough
		case newJob.IsStopped():
			if newJob.CompletionTime.IsZero() {
				now := metav1.Now()
				newJob.CompletionTime = &now
			}
			// When tracking failed, we cannot guarantee if the savepoint is the final job state.
			if newJob.State == v1beta1.JobStateLost && oldJob.FinalSavepoint {
				newJob.FinalSavepoint = false
			}
			// The job submitter may have failed even though the job execution was successful
			if len(newJob.FailureReasons) == 0 && oldJob.SubmitterExitCode != newJob.SubmitterExitCode && isNonZeroExitCode(newJob.SubmitterExitCode) && observedSubmitter.log != nil {
				newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message)
			}
		}

	}

	// Savepoint
	if observedSavepoint.status != nil && observedSavepoint.status.IsSuccessful() {
		newJob.SavepointGeneration++
		newJob.SavepointLocation = observedSavepoint.status.Location
		if finalSavepointRequested(newJob.ID, savepoint) {
			newJob.FinalSavepoint = true
		}
		// TODO: SavepointTime should be set with the timestamp generated in job manager.
		// Currently savepoint complete timestamp is not included in savepoints API response.
		// Whereas checkpoint API returns the timestamp latest_ack_timestamp.
		// Note: https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-checkpoints-details-checkpointid
		util.SetTimestamp(&newJob.SavepointTime)
	}

	return newJob
}