in controllers/flinkcluster/flinkcluster_reconciler.go [744:796]
func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReason {
var observed = reconciler.observed
var cluster = observed.cluster
var jobSpec = observed.cluster.Spec.Job
var job = observed.cluster.Status.Components.Job
var savepoint = observed.cluster.Status.Savepoint
var newRequestedControl = getNewControlRequest(cluster)
if !canTakeSavepoint(reconciler.observed.cluster) {
return ""
}
// Savepoint trigger priority is user request including update and job stop.
switch {
// TODO: spec.job.cancelRequested will be deprecated
// Should stop job with savepoint by user requested control
case newRequestedControl == v1beta1.ControlNameJobCancel || (jobSpec.CancelRequested != nil && *jobSpec.CancelRequested):
return v1beta1.SavepointReasonJobCancel
// Take savepoint by user request
case newRequestedControl == v1beta1.ControlNameSavepoint:
fallthrough
// TODO: spec.job.savepointGeneration will be deprecated
case jobSpec.SavepointGeneration > job.SavepointGeneration:
// Triggered by savepointGeneration increased.
// When previous savepoint is failed, savepoint trigger by spec.job.savepointGeneration is not possible
// because the field cannot be increased more.
// Note: checkSavepointGeneration in flinkcluster_validate.go
return v1beta1.SavepointReasonUserRequested
// Scheduled auto savepoint
case jobSpec.AutoSavepointSeconds != nil:
// When previous try was failed, check retry interval.
if savepoint.IsFailed() && savepoint.TriggerReason == v1beta1.SavepointReasonScheduled {
var nextRetryTime = util.GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second)
if time.Now().After(nextRetryTime) {
return v1beta1.SavepointReasonScheduled
} else {
return ""
}
}
// Check if next trigger time arrived.
var compareTime string
if len(job.SavepointTime) == 0 {
compareTime = job.StartTime
} else {
compareTime = job.SavepointTime
}
var nextTime = getTimeAfterAddedSeconds(compareTime, int64(*jobSpec.AutoSavepointSeconds))
if time.Now().After(nextTime) {
return v1beta1.SavepointReasonScheduled
}
}
return ""
}