in controllers/flinkcluster/flinkcluster_observer.go [642:721]
func (observer *ClusterStateObserver) syncRevisionStatus(observed *ObservedClusterState) error {
if observed.cluster == nil {
return nil
}
var cluster = observed.cluster
var revisions = observed.revisions
var recorded = cluster.Status
var currentRevision, nextRevision *appsv1.ControllerRevision
var controllerHistory = observer.history
revisionCount := len(revisions)
history.SortControllerRevisions(revisions)
// Use a local copy of cluster.Status.CollisionCount to avoid modifying cluster.Status directly.
var collisionCount int32
if recorded.Revision.CollisionCount != nil {
collisionCount = *recorded.Revision.CollisionCount
}
// create a new revision from the current cluster
nextRevision, err := newRevision(cluster, util.GetNextRevisionNumber(revisions), &collisionCount)
if err != nil {
return err
}
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, nextRevision)
equalCount := len(equalRevisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the next revision has not changed
nextRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
nextRevision, err = controllerHistory.UpdateControllerRevision(
equalRevisions[equalCount-1],
nextRevision.Revision)
if err != nil {
return err
}
} else {
//if there is no equivalent revision we create a new one
nextRevision, err = controllerHistory.CreateControllerRevision(cluster, nextRevision, &collisionCount)
if err != nil {
return err
}
}
// if the current revision is nil we initialize the history by setting it to the next revision
if recorded.Revision.CurrentRevision == "" {
currentRevision = nextRevision
// attempt to find the revision that corresponds to the current revision
} else {
for i := range revisions {
if revisions[i].Name == getCurrentRevisionName(&recorded.Revision) {
currentRevision = revisions[i]
break
}
}
}
if currentRevision == nil {
return fmt.Errorf("current ControlRevision resoucre not found")
}
// Update revision status.
observed.revision = Revision{
currentRevision: currentRevision.DeepCopy(),
nextRevision: nextRevision.DeepCopy(),
collisionCount: collisionCount,
}
// maintain the revision history limit
err = observer.truncateHistory(observed)
if err != nil {
return err
}
return nil
}