in controllers/flinkcluster/flinkcluster_util.go [362:406]
func isComponentUpdated(component client.Object, cluster *v1beta1.FlinkCluster) bool {
if !cluster.Status.Revision.IsUpdateTriggered() {
return true
}
switch o := component.(type) {
case *appsv1.Deployment:
if o == nil {
return false
}
case *appsv1.StatefulSet:
if o == nil {
return false
}
case *corev1.ConfigMap:
if o == nil {
return false
}
case *policyv1.PodDisruptionBudget:
if o == nil {
return false
}
case *corev1.Service:
if o == nil {
return false
}
case *batchv1.Job:
if o == nil {
return cluster.Spec.Job == nil
}
case *networkingv1.Ingress:
if o == nil {
jm := cluster.Spec.JobManager
return jm == nil || jm.Ingress == nil
}
case *autoscalingv2.HorizontalPodAutoscaler:
if o == nil {
return false
}
}
labels := component.GetLabels()
nextRevisionName := getNextRevisionName(&cluster.Status.Revision)
return labels[RevisionNameLabel] == nextRevisionName
}