in controllers/flinkcluster/flinkcluster_converter.go [77:137]
func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredClusterState {
state := &model.DesiredClusterState{}
cluster := observed.cluster
// The cluster has been deleted, all resources should be cleaned up.
if cluster == nil {
return state
}
jobSpec := cluster.Spec.Job
applicationMode := IsApplicationModeCluster(cluster)
if !shouldCleanup(cluster, "ConfigMap") {
state.ConfigMap = newConfigMap(cluster)
}
if !shouldCleanup(cluster, "PodDisruptionBudget") {
state.PodDisruptionBudget = newPodDisruptionBudget(cluster)
}
if !shouldCleanup(cluster, "HorizontalPodAutoscaler") {
state.HorizontalPodAutoscaler = newHorizontalPodAutoscaler(cluster)
}
if !shouldCleanup(cluster, "JobManager") && !applicationMode {
state.JmStatefulSet = newJobManagerStatefulSet(cluster)
}
if !shouldCleanup(cluster, "TaskManager") {
switch cluster.Spec.TaskManager.DeploymentType {
case v1beta1.DeploymentTypeStatefulSet:
state.TmStatefulSet = newTaskManagerStatefulSet(cluster)
case v1beta1.DeploymentTypeDeployment:
state.TmDeployment = newTaskManagerDeployment(cluster)
}
}
if !shouldCleanup(cluster, "TaskManagerService") {
state.TmService = newTaskManagerService(cluster)
}
if !shouldCleanup(cluster, "JobManagerService") {
state.JmService = newJobManagerService(cluster)
}
if !shouldCleanup(cluster, "JobManagerIngress") {
state.JmIngress = newJobManagerIngress(cluster)
}
if jobSpec != nil {
jobStatus := cluster.Status.Components.Job
keepJobState := (shouldStopJob(cluster) || jobStatus.IsStopped()) &&
(!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) &&
shouldCleanup(cluster, "Job")
if !keepJobState {
state.Job = newJob(cluster)
}
}
return state
}