in apis/flinkcluster/v1beta1/flinkcluster_types_util.go [79:109]
func (j *JobStatus) UpdateReady(spec *JobSpec, observeTime time.Time) bool {
var takeSavepointOnUpdate = spec.TakeSavepointOnUpdate == nil || *spec.TakeSavepointOnUpdate
switch {
case j == nil:
fallthrough
case !isBlank(spec.FromSavepoint):
return true
case j.IsActive():
// When job is active and takeSavepointOnUpdate is true, only after taking savepoint with final job state,
// proceed job update.
if takeSavepointOnUpdate {
if j.FinalSavepoint {
return true
}
} else if j.IsSavepointUpToDate(spec, observeTime) {
return true
}
case j.State == JobStateUpdating && !takeSavepointOnUpdate:
return true
default:
// In other cases, check if savepoint is up-to-date compared to job end time.
var jobCompletionTime time.Time
if !j.CompletionTime.IsZero() {
jobCompletionTime = j.CompletionTime.Time
}
if j.IsSavepointUpToDate(spec, jobCompletionTime) {
return true
}
}
return false
}