func()

in controllers/flinkcluster/flinkcluster_updater.go [803:930]


func (updater *ClusterStatusUpdater) isStatusChanged(
	ctx context.Context,
	currentStatus v1beta1.FlinkClusterStatus,
	newStatus v1beta1.FlinkClusterStatus) bool {
	log := logr.FromContextOrDiscard(ctx)

	var changed = false
	if newStatus.State != currentStatus.State {
		changed = true
		log.Info(
			"Cluster state changed",
			"current",
			currentStatus.State,
			"new",
			newStatus.State)
	}
	if !reflect.DeepEqual(newStatus.Control, currentStatus.Control) {
		log.Info(
			"Control status changed", "current",
			currentStatus.Control,
			"new",
			newStatus.Control)
		changed = true
	}
	if !reflect.DeepEqual(newStatus.Components.ConfigMap, currentStatus.Components.ConfigMap) {
		log.Info(
			"ConfigMap status changed",
			"current",
			currentStatus.Components.ConfigMap,
			"new",
			newStatus.Components.ConfigMap)
		changed = true
	}
	if !reflect.DeepEqual(newStatus.Components.JobManager, currentStatus.Components.JobManager) {
		log.Info(
			"JobManager StatefulSet status changed",
			"current", currentStatus.Components.JobManager,
			"new",
			newStatus.Components.JobManager)
		changed = true
	}
	if !reflect.DeepEqual(newStatus.Components.JobManagerService, currentStatus.Components.JobManagerService) {
		log.Info(
			"JobManager service status changed",
			"current",
			currentStatus.Components.JobManagerService,
			"new", newStatus.Components.JobManagerService)
		changed = true
	}
	if currentStatus.Components.JobManagerIngress == nil {
		if newStatus.Components.JobManagerIngress != nil {
			log.Info(
				"JobManager ingress status changed",
				"current",
				"nil",
				"new", *newStatus.Components.JobManagerIngress)
			changed = true
		}
	} else {
		if newStatus.Components.JobManagerIngress.State != currentStatus.Components.JobManagerIngress.State {
			log.Info(
				"JobManager ingress status changed",
				"current",
				*currentStatus.Components.JobManagerIngress,
				"new",
				*newStatus.Components.JobManagerIngress)
			changed = true
		}
	}
	if !reflect.DeepEqual(newStatus.Components.TaskManager, currentStatus.Components.TaskManager) {
		log.Info(
			"TaskManager StatefulSet status changed",
			"current",
			currentStatus.Components.TaskManager,
			"new",
			newStatus.Components.TaskManager)
		changed = true
	}
	if currentStatus.Components.Job == nil {
		if newStatus.Components.Job != nil {
			log.Info(
				"Job status changed",
				"current",
				"nil",
				"new",
				*newStatus.Components.Job)
			changed = true
		}
	} else {
		if newStatus.Components.Job != nil {
			var isEqual = reflect.DeepEqual(
				newStatus.Components.Job, currentStatus.Components.Job)
			if !isEqual {
				log.Info(
					"Job status changed",
					"current",
					*currentStatus.Components.Job,
					"new",
					*newStatus.Components.Job)
				changed = true
			}
		} else {
			changed = true
		}
	}
	if !reflect.DeepEqual(newStatus.Savepoint, currentStatus.Savepoint) {
		log.Info(
			"Savepoint status changed", "current",
			currentStatus.Savepoint,
			"new",
			newStatus.Savepoint)
		changed = true
	}
	var nr = newStatus.Revision     // New revision status
	var cr = currentStatus.Revision // Current revision status
	if nr.CurrentRevision != cr.CurrentRevision ||
		nr.NextRevision != cr.NextRevision ||
		(nr.CollisionCount != nil && cr.CollisionCount == nil) ||
		(cr.CollisionCount != nil && *nr.CollisionCount != *cr.CollisionCount) {
		log.Info(
			"FlinkCluster revision status changed", "current",
			fmt.Sprintf("currentRevision: %v, nextRevision: %v, collisionCount: %v", cr.CurrentRevision, cr.NextRevision, cr.CollisionCount),
			"new",
			fmt.Sprintf("currentRevision: %v, nextRevision: %v, collisionCount: %v", nr.CurrentRevision, nr.NextRevision, nr.CollisionCount))
		changed = true
	}
	return changed
}