func getDesiredClusterState()

in controllers/flinkcluster/flinkcluster_converter.go [77:137]


func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredClusterState {
	state := &model.DesiredClusterState{}
	cluster := observed.cluster
	// The cluster has been deleted, all resources should be cleaned up.
	if cluster == nil {
		return state
	}

	jobSpec := cluster.Spec.Job
	applicationMode := IsApplicationModeCluster(cluster)

	if !shouldCleanup(cluster, "ConfigMap") {
		state.ConfigMap = newConfigMap(cluster)
	}

	if !shouldCleanup(cluster, "PodDisruptionBudget") {
		state.PodDisruptionBudget = newPodDisruptionBudget(cluster)
	}

	if !shouldCleanup(cluster, "HorizontalPodAutoscaler") {
		state.HorizontalPodAutoscaler = newHorizontalPodAutoscaler(cluster)
	}

	if !shouldCleanup(cluster, "JobManager") && !applicationMode {
		state.JmStatefulSet = newJobManagerStatefulSet(cluster)
	}

	if !shouldCleanup(cluster, "TaskManager") {
		switch cluster.Spec.TaskManager.DeploymentType {
		case v1beta1.DeploymentTypeStatefulSet:
			state.TmStatefulSet = newTaskManagerStatefulSet(cluster)
		case v1beta1.DeploymentTypeDeployment:
			state.TmDeployment = newTaskManagerDeployment(cluster)
		}
	}
	if !shouldCleanup(cluster, "TaskManagerService") {
		state.TmService = newTaskManagerService(cluster)
	}

	if !shouldCleanup(cluster, "JobManagerService") {
		state.JmService = newJobManagerService(cluster)
	}

	if !shouldCleanup(cluster, "JobManagerIngress") {
		state.JmIngress = newJobManagerIngress(cluster)
	}

	if jobSpec != nil {
		jobStatus := cluster.Status.Components.Job

		keepJobState := (shouldStopJob(cluster) || jobStatus.IsStopped()) &&
			(!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) &&
			shouldCleanup(cluster, "Job")

		if !keepJobState {
			state.Job = newJob(cluster)
		}
	}

	return state
}