in controllers/flinkcluster/flinkcluster_updater.go [102:188]
func (updater *ClusterStatusUpdater) createStatusChangeEvents(
oldStatus v1beta1.FlinkClusterStatus,
newStatus v1beta1.FlinkClusterStatus) {
if oldStatus.Components.JobManager != nil &&
newStatus.Components.JobManager != nil &&
oldStatus.Components.JobManager.State != newStatus.Components.JobManager.State {
updater.createStatusChangeEvent(
"JobManager StatefulSet",
oldStatus.Components.JobManager.State,
newStatus.Components.JobManager.State)
}
// ConfigMap.
if oldStatus.Components.ConfigMap != nil &&
newStatus.Components.ConfigMap != nil &&
oldStatus.Components.ConfigMap.State !=
newStatus.Components.ConfigMap.State {
updater.createStatusChangeEvent(
"ConfigMap",
oldStatus.Components.ConfigMap.State,
newStatus.Components.ConfigMap.State)
}
// JobManager service.
if oldStatus.Components.JobManagerService.State !=
newStatus.Components.JobManagerService.State {
updater.createStatusChangeEvent(
"JobManager service",
oldStatus.Components.JobManagerService.State,
newStatus.Components.JobManagerService.State)
}
// JobManager ingress.
if oldStatus.Components.JobManagerIngress == nil && newStatus.Components.JobManagerIngress != nil {
updater.createStatusEvent(
"JobManager ingress",
newStatus.Components.JobManagerIngress.State)
}
if oldStatus.Components.JobManagerIngress != nil && newStatus.Components.JobManagerIngress != nil &&
oldStatus.Components.JobManagerIngress.State != newStatus.Components.JobManagerIngress.State {
updater.createStatusChangeEvent(
"JobManager ingress",
oldStatus.Components.JobManagerIngress.State,
newStatus.Components.JobManagerIngress.State)
}
// TaskManager Statefulset/Deployment.
if oldStatus.Components.TaskManager != nil &&
newStatus.Components.TaskManager != nil &&
oldStatus.Components.TaskManager.State !=
newStatus.Components.TaskManager.State {
updater.createStatusChangeEvent(
"TaskManager",
oldStatus.Components.TaskManager.State,
newStatus.Components.TaskManager.State)
}
// Job.
if oldStatus.Components.Job == nil && newStatus.Components.Job != nil {
updater.createStatusEvent("Job", newStatus.Components.Job.State)
}
if oldStatus.Components.Job != nil && newStatus.Components.Job != nil &&
oldStatus.Components.Job.State != newStatus.Components.Job.State {
updater.createStatusChangeEvent(
"Job",
oldStatus.Components.Job.State,
newStatus.Components.Job.State)
}
// Cluster.
if oldStatus.State != newStatus.State {
updater.createStatusChangeEvent("Cluster", oldStatus.State, newStatus.State)
}
// Savepoint.
if newStatus.Savepoint != nil && !reflect.DeepEqual(oldStatus.Savepoint, newStatus.Savepoint) {
eventType, eventReason, eventMessage := getSavepointEvent(*newStatus.Savepoint)
updater.recorder.Event(updater.observed.cluster, eventType, eventReason, eventMessage)
}
// Control.
if newStatus.Control != nil && !reflect.DeepEqual(oldStatus.Control, newStatus.Control) {
eventType, eventReason, eventMessage := getControlEvent(*newStatus.Control)
updater.recorder.Event(updater.observed.cluster, eventType, eventReason, eventMessage)
}
}