in controllers/flinkcluster/flinkcluster_updater.go [803:930]
func (updater *ClusterStatusUpdater) isStatusChanged(
ctx context.Context,
currentStatus v1beta1.FlinkClusterStatus,
newStatus v1beta1.FlinkClusterStatus) bool {
log := logr.FromContextOrDiscard(ctx)
var changed = false
if newStatus.State != currentStatus.State {
changed = true
log.Info(
"Cluster state changed",
"current",
currentStatus.State,
"new",
newStatus.State)
}
if !reflect.DeepEqual(newStatus.Control, currentStatus.Control) {
log.Info(
"Control status changed", "current",
currentStatus.Control,
"new",
newStatus.Control)
changed = true
}
if !reflect.DeepEqual(newStatus.Components.ConfigMap, currentStatus.Components.ConfigMap) {
log.Info(
"ConfigMap status changed",
"current",
currentStatus.Components.ConfigMap,
"new",
newStatus.Components.ConfigMap)
changed = true
}
if !reflect.DeepEqual(newStatus.Components.JobManager, currentStatus.Components.JobManager) {
log.Info(
"JobManager StatefulSet status changed",
"current", currentStatus.Components.JobManager,
"new",
newStatus.Components.JobManager)
changed = true
}
if !reflect.DeepEqual(newStatus.Components.JobManagerService, currentStatus.Components.JobManagerService) {
log.Info(
"JobManager service status changed",
"current",
currentStatus.Components.JobManagerService,
"new", newStatus.Components.JobManagerService)
changed = true
}
if currentStatus.Components.JobManagerIngress == nil {
if newStatus.Components.JobManagerIngress != nil {
log.Info(
"JobManager ingress status changed",
"current",
"nil",
"new", *newStatus.Components.JobManagerIngress)
changed = true
}
} else {
if newStatus.Components.JobManagerIngress.State != currentStatus.Components.JobManagerIngress.State {
log.Info(
"JobManager ingress status changed",
"current",
*currentStatus.Components.JobManagerIngress,
"new",
*newStatus.Components.JobManagerIngress)
changed = true
}
}
if !reflect.DeepEqual(newStatus.Components.TaskManager, currentStatus.Components.TaskManager) {
log.Info(
"TaskManager StatefulSet status changed",
"current",
currentStatus.Components.TaskManager,
"new",
newStatus.Components.TaskManager)
changed = true
}
if currentStatus.Components.Job == nil {
if newStatus.Components.Job != nil {
log.Info(
"Job status changed",
"current",
"nil",
"new",
*newStatus.Components.Job)
changed = true
}
} else {
if newStatus.Components.Job != nil {
var isEqual = reflect.DeepEqual(
newStatus.Components.Job, currentStatus.Components.Job)
if !isEqual {
log.Info(
"Job status changed",
"current",
*currentStatus.Components.Job,
"new",
*newStatus.Components.Job)
changed = true
}
} else {
changed = true
}
}
if !reflect.DeepEqual(newStatus.Savepoint, currentStatus.Savepoint) {
log.Info(
"Savepoint status changed", "current",
currentStatus.Savepoint,
"new",
newStatus.Savepoint)
changed = true
}
var nr = newStatus.Revision // New revision status
var cr = currentStatus.Revision // Current revision status
if nr.CurrentRevision != cr.CurrentRevision ||
nr.NextRevision != cr.NextRevision ||
(nr.CollisionCount != nil && cr.CollisionCount == nil) ||
(cr.CollisionCount != nil && *nr.CollisionCount != *cr.CollisionCount) {
log.Info(
"FlinkCluster revision status changed", "current",
fmt.Sprintf("currentRevision: %v, nextRevision: %v, collisionCount: %v", cr.CurrentRevision, cr.NextRevision, cr.CollisionCount),
"new",
fmt.Sprintf("currentRevision: %v, nextRevision: %v, collisionCount: %v", nr.CurrentRevision, nr.NextRevision, nr.CollisionCount))
changed = true
}
return changed
}