in controllers/flinkcluster/flinkcluster_updater.go [985:1042]
func (updater *ClusterStatusUpdater) deriveSavepointStatus(
observedSavepoint *Savepoint,
recordedSavepointStatus *v1beta1.SavepointStatus,
newJobStatus *v1beta1.JobStatus,
flinkJobID *string) *v1beta1.SavepointStatus {
if recordedSavepointStatus == nil {
return nil
}
// Derived savepoint status to return
var s = recordedSavepointStatus.DeepCopy()
var errMsg string
// Update the savepoint status when observed savepoint is found.
if s.State == v1beta1.SavepointStateInProgress {
// Derive the state from the observed savepoint in JobManager.
status := observedSavepoint.status
switch {
case status != nil && status.IsSuccessful():
s.State = v1beta1.SavepointStateSucceeded
case status != nil && status.IsFailed():
s.State = v1beta1.SavepointStateFailed
errMsg = fmt.Sprintf("Savepoint error: %v", observedSavepoint.status.FailureCause.StackTrace)
case observedSavepoint.error != nil:
s.State = v1beta1.SavepointStateFailed
errMsg = fmt.Sprintf("Failed to get savepoint status: %v", observedSavepoint.error)
}
// Derive the failure state from Flink job status.
// Append additional error message if it already exists.
if s.State == v1beta1.SavepointStateFailed {
switch {
case newJobStatus.IsStopped():
errMsg = "Flink job is stopped: " + errMsg
s.State = v1beta1.SavepointStateFailed
case flinkJobID == nil || *flinkJobID != recordedSavepointStatus.JobID:
errMsg = "Savepoint triggered Flink job is lost: " + errMsg
s.State = v1beta1.SavepointStateFailed
}
}
}
// TODO: Record event or introduce Condition in CRD status to notify update state pended.
// https://github.com/kubernetes/apimachinery/blob/57f2a0733447cfd41294477d833cce6580faaca3/pkg/apis/meta/v1/types.go#L1376
// Make up message.
if errMsg != "" {
if s.TriggerReason == v1beta1.SavepointReasonUpdate {
errMsg =
"Failed to take savepoint for update. " +
"The update process is being postponed until a savepoint is available. " + errMsg
}
if len(errMsg) > 1024 {
errMsg = errMsg[:1024]
}
s.Message = errMsg
}
return s
}