in controllers/flinkcluster/flinkcluster_reconciler.go [852:902]
func (reconciler *ClusterReconciler) updateStatus(
ctx context.Context, ss **v1beta1.SavepointStatus, cs **v1beta1.FlinkClusterControlStatus) {
log := logr.FromContextOrDiscard(ctx)
var savepointStatus = *ss
var controlStatus = *cs
if savepointStatus == nil && controlStatus == nil {
return
}
// Record events
if savepointStatus != nil {
eventType, eventReason, eventMessage := getSavepointEvent(*savepointStatus)
reconciler.recorder.Event(reconciler.observed.cluster, eventType, eventReason, eventMessage)
}
if controlStatus != nil {
eventType, eventReason, eventMessage := getControlEvent(*controlStatus)
reconciler.recorder.Event(reconciler.observed.cluster, eventType, eventReason, eventMessage)
}
// Update status
var clusterClone = reconciler.observed.cluster.DeepCopy()
var statusUpdateErr error
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var newStatus = &clusterClone.Status
if savepointStatus != nil {
newStatus.Savepoint = savepointStatus
}
if controlStatus != nil {
newStatus.Control = controlStatus
}
util.SetTimestamp(&newStatus.LastUpdateTime)
log.Info("Updating cluster status", "clusterClone", clusterClone, "newStatus", newStatus)
statusUpdateErr = reconciler.k8sClient.Status().Update(ctx, clusterClone)
if statusUpdateErr == nil {
return nil
}
var clusterUpdated v1beta1.FlinkCluster
if err := reconciler.k8sClient.Get(
ctx,
types.NamespacedName{Namespace: clusterClone.Namespace, Name: clusterClone.Name}, &clusterUpdated); err == nil {
clusterClone = clusterUpdated.DeepCopy()
}
return statusUpdateErr
})
if statusUpdateErr != nil {
log.Error(
statusUpdateErr, "Failed to update status.", "error", statusUpdateErr)
}
}