in controllers/flinkcluster/flinkcluster_converter.go [1071:1102]
func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool {
var jobStatus = cluster.Status.Components.Job
// Session cluster.
if jobStatus == nil {
return false
}
if cluster.Status.Revision.IsUpdateTriggered() {
return false
}
var action v1beta1.CleanupAction
switch jobStatus.State {
case v1beta1.JobStateSucceeded:
action = cluster.Spec.Job.CleanupPolicy.AfterJobSucceeds
case v1beta1.JobStateFailed, v1beta1.JobStateLost, v1beta1.JobStateDeployFailed:
action = cluster.Spec.Job.CleanupPolicy.AfterJobFails
case v1beta1.JobStateCancelled:
action = cluster.Spec.Job.CleanupPolicy.AfterJobCancelled
default:
return false
}
switch action {
case v1beta1.CleanupActionDeleteCluster:
return true
case v1beta1.CleanupActionDeleteTaskManager:
return component == "TaskManager"
}
return false
}