func()

in controllers/flinkcluster/flinkcluster_updater.go [985:1042]


func (updater *ClusterStatusUpdater) deriveSavepointStatus(
	observedSavepoint *Savepoint,
	recordedSavepointStatus *v1beta1.SavepointStatus,
	newJobStatus *v1beta1.JobStatus,
	flinkJobID *string) *v1beta1.SavepointStatus {
	if recordedSavepointStatus == nil {
		return nil
	}

	// Derived savepoint status to return
	var s = recordedSavepointStatus.DeepCopy()
	var errMsg string

	// Update the savepoint status when observed savepoint is found.
	if s.State == v1beta1.SavepointStateInProgress {
		// Derive the state from the observed savepoint in JobManager.
		status := observedSavepoint.status
		switch {
		case status != nil && status.IsSuccessful():
			s.State = v1beta1.SavepointStateSucceeded
		case status != nil && status.IsFailed():
			s.State = v1beta1.SavepointStateFailed
			errMsg = fmt.Sprintf("Savepoint error: %v", observedSavepoint.status.FailureCause.StackTrace)
		case observedSavepoint.error != nil:
			s.State = v1beta1.SavepointStateFailed
			errMsg = fmt.Sprintf("Failed to get savepoint status: %v", observedSavepoint.error)
		}

		// Derive the failure state from Flink job status.
		// Append additional error message if it already exists.
		if s.State == v1beta1.SavepointStateFailed {
			switch {
			case newJobStatus.IsStopped():
				errMsg = "Flink job is stopped: " + errMsg
				s.State = v1beta1.SavepointStateFailed
			case flinkJobID == nil || *flinkJobID != recordedSavepointStatus.JobID:
				errMsg = "Savepoint triggered Flink job is lost: " + errMsg
				s.State = v1beta1.SavepointStateFailed
			}
		}
	}
	// TODO: Record event or introduce Condition in CRD status to notify update state pended.
	// https://github.com/kubernetes/apimachinery/blob/57f2a0733447cfd41294477d833cce6580faaca3/pkg/apis/meta/v1/types.go#L1376
	// Make up message.
	if errMsg != "" {
		if s.TriggerReason == v1beta1.SavepointReasonUpdate {
			errMsg =
				"Failed to take savepoint for update. " +
					"The update process is being postponed until a savepoint is available. " + errMsg
		}
		if len(errMsg) > 1024 {
			errMsg = errMsg[:1024]
		}
		s.Message = errMsg
	}

	return s
}