in controllers/flinkcluster/flinkcluster_updater.go [623:801]
func (updater *ClusterStatusUpdater) deriveJobStatus(ctx context.Context) *v1beta1.JobStatus {
log := logr.FromContextOrDiscard(ctx)
var observed = updater.observed
var observedCluster = observed.cluster
var jobSpec = observedCluster.Spec.Job
if jobSpec == nil {
return nil
}
var observedSubmitter = observed.flinkJobSubmitter
var observedFlinkJob = observed.flinkJob.status
var observedSavepoint = observed.savepoint
var recorded = observedCluster.Status
var savepoint = recorded.Savepoint
var oldJob = recorded.Components.Job
var newJob *v1beta1.JobStatus
// Derive new job state.
if oldJob != nil {
newJob = oldJob.DeepCopy()
} else {
newJob = new(v1beta1.JobStatus)
}
if observedSubmitter.job != nil {
newJob.SubmitterName = observedSubmitter.job.Name
exitCode, _ := updater.deriveJobSubmitterExitCodeAndReason(observed.flinkJobSubmitter.pod, observed.flinkJobSubmitter.job)
newJob.SubmitterExitCode = exitCode
} else if observedSubmitter.job == nil || observed.flinkJobSubmitter.pod == nil {
// Submitter is nil, so the submitter exit code shouldn't be "running"
if oldJob != nil && oldJob.SubmitterExitCode == -1 {
newJob.SubmitterExitCode = 0
}
}
var newJobState v1beta1.JobState
switch {
case oldJob == nil:
newJobState = v1beta1.JobStatePending
case shouldUpdateJob(&observed):
newJobState = v1beta1.JobStateUpdating
case oldJob.IsStopped():
// When a new job is deploying, update the job state to deploying.
if observedSubmitter.job != nil && (observedSubmitter.job.Status.Active == 1 || isJobInitialising(observedSubmitter.job.Status)) {
newJobState = v1beta1.JobStateDeploying
} else {
newJobState = oldJob.State
}
case oldJob.IsPending() && oldJob.DeployTime != "":
newJobState = v1beta1.JobStateDeploying
// Derive the job state from the observed Flink job, if it exists.
case observedFlinkJob != nil:
if observedFlinkJob.Id != "" {
newJob.ID = observedFlinkJob.Id
}
if observedFlinkJob.Name != "" {
newJob.Name = observedFlinkJob.Name
}
tmpState := getFlinkJobDeploymentState(observedFlinkJob.State)
if observedSubmitter.job == nil ||
(observedSubmitter.job.Status.Failed < 1 && tmpState != v1beta1.JobStateSucceeded) {
newJobState = tmpState
break
}
log.Info("The submitter maybe still running. Waiting for it")
fallthrough
case oldJob.IsActive() && observedSubmitter.job != nil && observedSubmitter.job.Status.Active == 0:
if observedSubmitter.job.Status.Succeeded == 1 {
newJobState = v1beta1.JobStateSucceeded
if newJob.SubmitterExitCode == -1 {
log.Info("Job succeeded but the exit code is -1. This is an edge case that may " +
"happen if the controller is down or busy for a long time and the submitter pod is deleted externally " +
"including by kube-system:pod-garbage-collector. Changing exit code to 0.")
newJob.SubmitterExitCode = 0
}
} else if observedSubmitter.job.Status.Failed == 1 {
newJobState = v1beta1.JobStateFailed
} else {
newJobState = oldJob.State
}
case shouldStopJob(observedCluster):
newJobState = v1beta1.JobStateCancelled
// When Flink job not found in JobManager or JobManager is unavailable
case isFlinkAPIReady(observed.flinkJob.list):
if oldJob.State == v1beta1.JobStateRunning {
newJobState = v1beta1.JobStateLost
break
}
fallthrough
default:
// Maintain the job state as recorded if job is not being deployed.
if oldJob.State != v1beta1.JobStateDeploying {
newJobState = oldJob.State
break
}
// Job must be in deployment but the submitter not found or tracking failed.
var jobDeployState = observedSubmitter.getState()
if observedSubmitter.job == nil || jobDeployState == JobDeployStateUnknown {
newJobState = v1beta1.JobStateLost
break
}
// Case in which the job submission clearly fails even if it is not confirmed by JobManager
// Job submitter is deployed but failed.
if jobDeployState == JobDeployStateFailed {
newJobState = v1beta1.JobStateDeployFailed
break
}
newJobState = oldJob.State
}
// Update State
newJob.State = newJobState
// Derived new job status if the state is changed.
if oldJob == nil || oldJob.State != newJob.State {
// TODO: It would be ideal to set the times with the timestamp retrieved from the Flink API like /jobs/{job-id}.
switch {
case newJob.IsPending():
newJob.DeployTime = ""
switch newJob.State {
case v1beta1.JobStateUpdating:
newJob.RestartCount = 0
case v1beta1.JobStateRestarting:
newJob.RestartCount++
}
case newJob.State == v1beta1.JobStateRunning:
util.SetTimestamp(&newJob.StartTime)
newJob.CompletionTime = nil
// When job started, the savepoint is not the final state of the job any more.
if oldJob.FinalSavepoint {
newJob.FinalSavepoint = false
}
case newJob.IsFailed():
if len(newJob.FailureReasons) == 0 {
newJob.FailureReasons = []string{}
exceptions := observed.flinkJob.exceptions
if exceptions != nil && len(exceptions.Exceptions) > 0 {
for _, e := range exceptions.Exceptions {
newJob.FailureReasons = append(newJob.FailureReasons, e.Exception)
}
} else if observedSubmitter.log != nil {
newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message)
}
}
fallthrough
case newJob.IsStopped():
if newJob.CompletionTime.IsZero() {
now := metav1.Now()
newJob.CompletionTime = &now
}
// When tracking failed, we cannot guarantee if the savepoint is the final job state.
if newJob.State == v1beta1.JobStateLost && oldJob.FinalSavepoint {
newJob.FinalSavepoint = false
}
// The job submitter may have failed even though the job execution was successful
if len(newJob.FailureReasons) == 0 && oldJob.SubmitterExitCode != newJob.SubmitterExitCode && isNonZeroExitCode(newJob.SubmitterExitCode) && observedSubmitter.log != nil {
newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message)
}
}
}
// Savepoint
if observedSavepoint.status != nil && observedSavepoint.status.IsSuccessful() {
newJob.SavepointGeneration++
newJob.SavepointLocation = observedSavepoint.status.Location
if finalSavepointRequested(newJob.ID, savepoint) {
newJob.FinalSavepoint = true
}
// TODO: SavepointTime should be set with the timestamp generated in job manager.
// Currently savepoint complete timestamp is not included in savepoints API response.
// Whereas checkpoint API returns the timestamp latest_ack_timestamp.
// Note: https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-checkpoints-details-checkpointid
util.SetTimestamp(&newJob.SavepointTime)
}
return newJob
}