in controllers/flinkcluster/flinkcluster_updater.go [1044:1105]
func deriveControlStatus(
cluster *v1beta1.FlinkCluster,
newSavepoint *v1beta1.SavepointStatus,
newJob *v1beta1.JobStatus,
recordedControl *v1beta1.FlinkClusterControlStatus) *v1beta1.FlinkClusterControlStatus {
var controlRequest = getNewControlRequest(cluster)
// Derived control status to return
var c *v1beta1.FlinkClusterControlStatus
// New control status
if controlStatusChanged(cluster, controlRequest) {
c = getControlStatus(controlRequest, v1beta1.ControlStateRequested)
return c
}
// Update control status in progress.
if recordedControl != nil && recordedControl.State == v1beta1.ControlStateInProgress {
c = recordedControl.DeepCopy()
switch recordedControl.Name {
case v1beta1.ControlNameJobCancel:
switch {
case newJob.State == v1beta1.JobStateCancelled:
if newSavepoint != nil {
if newSavepoint.State == v1beta1.SavepointStateSucceeded {
c.State = v1beta1.ControlStateSucceeded
} else if newSavepoint.IsFailed() && newSavepoint.TriggerReason == v1beta1.SavepointReasonJobCancel {
c.Message = "Aborted job cancellation: failed to take savepoint."
c.State = v1beta1.ControlStateFailed
}
} else {
c.State = v1beta1.ControlStateSucceeded
}
case newJob.IsStopped():
c.Message = "Aborted job cancellation: job is stopped already."
c.State = v1beta1.ControlStateFailed
}
case v1beta1.ControlNameSavepoint:
if newSavepoint == nil {
c.Message = "Aborted: savepoint not defined"
c.State = v1beta1.ControlStateFailed
} else if newSavepoint.State == v1beta1.SavepointStateSucceeded {
c.State = v1beta1.ControlStateSucceeded
} else if newSavepoint.IsFailed() && newSavepoint.TriggerReason == v1beta1.SavepointReasonUserRequested {
c.State = v1beta1.ControlStateFailed
}
}
// Update time when state changed.
if c.State != v1beta1.ControlStateInProgress {
util.SetTimestamp(&c.UpdateTime)
}
return c
}
// Maintain control status if there is no change.
if recordedControl != nil && c == nil {
c = recordedControl.DeepCopy()
return c
}
return nil
}