func()

in controllers/flinkcluster/flinkcluster_reconciler.go [744:796]


func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReason {
	var observed = reconciler.observed
	var cluster = observed.cluster
	var jobSpec = observed.cluster.Spec.Job
	var job = observed.cluster.Status.Components.Job
	var savepoint = observed.cluster.Status.Savepoint
	var newRequestedControl = getNewControlRequest(cluster)

	if !canTakeSavepoint(reconciler.observed.cluster) {
		return ""
	}

	// Savepoint trigger priority is user request including update and job stop.
	switch {
	// TODO: spec.job.cancelRequested will be deprecated
	// Should stop job with savepoint by user requested control
	case newRequestedControl == v1beta1.ControlNameJobCancel || (jobSpec.CancelRequested != nil && *jobSpec.CancelRequested):
		return v1beta1.SavepointReasonJobCancel
	// Take savepoint by user request
	case newRequestedControl == v1beta1.ControlNameSavepoint:
		fallthrough
	// TODO: spec.job.savepointGeneration will be deprecated
	case jobSpec.SavepointGeneration > job.SavepointGeneration:
		// Triggered by savepointGeneration increased.
		// When previous savepoint is failed, savepoint trigger by spec.job.savepointGeneration is not possible
		// because the field cannot be increased more.
		// Note: checkSavepointGeneration in flinkcluster_validate.go
		return v1beta1.SavepointReasonUserRequested
	// Scheduled auto savepoint
	case jobSpec.AutoSavepointSeconds != nil:
		// When previous try was failed, check retry interval.
		if savepoint.IsFailed() && savepoint.TriggerReason == v1beta1.SavepointReasonScheduled {
			var nextRetryTime = util.GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second)
			if time.Now().After(nextRetryTime) {
				return v1beta1.SavepointReasonScheduled
			} else {
				return ""
			}
		}
		// Check if next trigger time arrived.
		var compareTime string
		if len(job.SavepointTime) == 0 {
			compareTime = job.StartTime
		} else {
			compareTime = job.SavepointTime
		}
		var nextTime = getTimeAfterAddedSeconds(compareTime, int64(*jobSpec.AutoSavepointSeconds))
		if time.Now().After(nextTime) {
			return v1beta1.SavepointReasonScheduled
		}
	}
	return ""
}