func shouldCleanup()

in controllers/flinkcluster/flinkcluster_converter.go [1071:1102]


func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool {
	var jobStatus = cluster.Status.Components.Job
	// Session cluster.
	if jobStatus == nil {
		return false
	}

	if cluster.Status.Revision.IsUpdateTriggered() {
		return false
	}

	var action v1beta1.CleanupAction
	switch jobStatus.State {
	case v1beta1.JobStateSucceeded:
		action = cluster.Spec.Job.CleanupPolicy.AfterJobSucceeds
	case v1beta1.JobStateFailed, v1beta1.JobStateLost, v1beta1.JobStateDeployFailed:
		action = cluster.Spec.Job.CleanupPolicy.AfterJobFails
	case v1beta1.JobStateCancelled:
		action = cluster.Spec.Job.CleanupPolicy.AfterJobCancelled
	default:
		return false
	}

	switch action {
	case v1beta1.CleanupActionDeleteCluster:
		return true
	case v1beta1.CleanupActionDeleteTaskManager:
		return component == "TaskManager"
	}

	return false
}