func()

in controllers/flinkcluster/flinkcluster_updater.go [207:566]


func (updater *ClusterStatusUpdater) deriveClusterStatus(
	ctx context.Context,
	cluster *v1beta1.FlinkCluster,
	observed *ObservedClusterState) v1beta1.FlinkClusterStatus {
	var totalComponents int
	if IsApplicationModeCluster(cluster) {
		// jmService, tmStatefulSet.
		totalComponents = 2
	} else {
		// jmStatefulSet, jmService, tmStatefulSet.
		totalComponents = 3
	}

	var recorded = cluster.Status
	var status = v1beta1.FlinkClusterStatus{}
	var runningComponents = 0

	// ConfigMap.
	var observedConfigMap = observed.configMap
	cmStatus := &status.Components.ConfigMap
	if !isComponentUpdated(observedConfigMap, observed.cluster) && shouldUpdateCluster(observed) {
		*cmStatus = new(v1beta1.ConfigMapStatus)
		recorded.Components.ConfigMap.DeepCopyInto(*cmStatus)
		(*cmStatus).State = v1beta1.ComponentStateUpdating
	} else if observedConfigMap != nil {
		*cmStatus = &v1beta1.ConfigMapStatus{
			Name:  observedConfigMap.Name,
			State: v1beta1.ComponentStateReady,
		}
	} else if recorded.Components.ConfigMap != nil {
		*cmStatus = &v1beta1.ConfigMapStatus{
			Name:  recorded.Components.ConfigMap.Name,
			State: v1beta1.ComponentStateDeleted,
		}
	}

	// JobManager StatefulSet.
	var observedJmStatefulSet = observed.jmStatefulSet
	jmStatus := &status.Components.JobManager
	if !IsApplicationModeCluster(cluster) {
		if !isComponentUpdated(observedJmStatefulSet, observed.cluster) && shouldUpdateCluster(observed) {
			*jmStatus = new(v1beta1.JobManagerStatus)
			recorded.Components.JobManager.DeepCopyInto(*jmStatus)
			(*jmStatus).State = v1beta1.ComponentStateUpdating
		} else if observedJmStatefulSet != nil {
			*jmStatus = &v1beta1.JobManagerStatus{
				Name:          observedJmStatefulSet.Name,
				State:         getStatefulSetState(observedJmStatefulSet),
				Replicas:      observedJmStatefulSet.Status.Replicas,
				ReadyReplicas: observedJmStatefulSet.Status.ReadyReplicas,
				Ready:         fmt.Sprintf("%d/%d", observedJmStatefulSet.Status.ReadyReplicas, observedJmStatefulSet.Status.Replicas),
			}
			if (*jmStatus).State == v1beta1.ComponentStateReady {
				runningComponents++
			}
		} else if recorded.Components.JobManager != nil {
			*jmStatus = &v1beta1.JobManagerStatus{
				Name:  recorded.Components.JobManager.Name,
				State: v1beta1.ComponentStateDeleted,
			}
		}
	}

	// JobManager service.
	var observedJmService = observed.jmService
	if !isComponentUpdated(observedJmService, observed.cluster) && shouldUpdateCluster(observed) {
		recorded.Components.JobManagerService.DeepCopyInto(&status.Components.JobManagerService)
		status.Components.JobManagerService.State = v1beta1.ComponentStateUpdating
	} else if observedJmService != nil {
		var nodePort int32
		var loadBalancerIngress []corev1.LoadBalancerIngress
		state := v1beta1.ComponentStateNotReady

		switch observedJmService.Spec.Type {
		case corev1.ServiceTypeClusterIP:
			if observedJmService.Spec.ClusterIP != "" {
				state = v1beta1.ComponentStateReady
				runningComponents++
			}
		case corev1.ServiceTypeLoadBalancer:
			if len(observedJmService.Status.LoadBalancer.Ingress) > 0 {
				state = v1beta1.ComponentStateReady
				runningComponents++
				loadBalancerIngress = observedJmService.Status.LoadBalancer.Ingress
			}
		case corev1.ServiceTypeNodePort:
			if len(observedJmService.Spec.Ports) > 0 {
				state = v1beta1.ComponentStateReady
				runningComponents++
				for _, port := range observedJmService.Spec.Ports {
					if port.Name == "ui" {
						nodePort = port.NodePort
					}
				}
			}
		}

		status.Components.JobManagerService =
			v1beta1.JobManagerServiceStatus{
				Name:                observedJmService.Name,
				State:               state,
				NodePort:            nodePort,
				LoadBalancerIngress: loadBalancerIngress,
			}
	} else if recorded.Components.JobManagerService.Name != "" {
		status.Components.JobManagerService =
			v1beta1.JobManagerServiceStatus{
				Name:  recorded.Components.JobManagerService.Name,
				State: v1beta1.ComponentStateDeleted,
			}
	}

	// (Optional) JobManager ingress.
	var observedJmIngress = observed.jmIngress
	if !isComponentUpdated(observedJmIngress, observed.cluster) && shouldUpdateCluster(observed) {
		status.Components.JobManagerIngress = &v1beta1.JobManagerIngressStatus{}
		recorded.Components.JobManagerIngress.DeepCopyInto(status.Components.JobManagerIngress)
		status.Components.JobManagerIngress.State = v1beta1.ComponentStateUpdating
	} else if observedJmIngress != nil {
		var state v1beta1.ComponentState
		var urls []string
		var useTLS bool
		var useHost bool
		var loadbalancerReady bool

		if len(observedJmIngress.Spec.TLS) > 0 {
			useTLS = true
		}

		if useTLS {
			for _, tls := range observedJmIngress.Spec.TLS {
				for _, host := range tls.Hosts {
					if host != "" {
						urls = append(urls, "https://"+host)
					}
				}
			}
		} else {
			for _, rule := range observedJmIngress.Spec.Rules {
				if rule.Host != "" {
					urls = append(urls, "http://"+rule.Host)
				}
			}
		}
		if len(urls) > 0 {
			useHost = true
		}

		// Check loadbalancer is ready.
		if len(observedJmIngress.Status.LoadBalancer.Ingress) > 0 {
			var addr string
			for _, ingress := range observedJmIngress.Status.LoadBalancer.Ingress {
				// Get loadbalancer address.
				if ingress.Hostname != "" {
					addr = ingress.Hostname
				} else if ingress.IP != "" {
					addr = ingress.IP
				}
				// If ingress spec does not have host, get ip or hostname of loadbalancer.
				if !useHost && addr != "" {
					if useTLS {
						urls = append(urls, "https://"+addr)
					} else {
						urls = append(urls, "http://"+addr)
					}
				}
			}
			// If any ready LB found, state is ready.
			if addr != "" {
				loadbalancerReady = true
			}
		}

		// Jobmanager ingress state become ready when LB for ingress is specified.
		if loadbalancerReady {
			state = v1beta1.ComponentStateReady
		} else {
			state = v1beta1.ComponentStateNotReady
		}

		status.Components.JobManagerIngress =
			&v1beta1.JobManagerIngressStatus{
				Name:  observedJmIngress.Name,
				State: state,
				URLs:  urls,
			}
	} else if recorded.Components.JobManagerIngress != nil &&
		recorded.Components.JobManagerIngress.Name != "" {
		status.Components.JobManagerIngress =
			&v1beta1.JobManagerIngressStatus{
				Name:  recorded.Components.JobManagerIngress.Name,
				State: v1beta1.ComponentStateDeleted,
			}
	}
	labelSelector := labels.SelectorFromSet(getComponentLabels(cluster, "taskmanager"))
	var clusterTmDeploymentType = cluster.Spec.TaskManager.DeploymentType
	if clusterTmDeploymentType == "" || clusterTmDeploymentType == v1beta1.DeploymentTypeStatefulSet {
		// TaskManager StatefulSet.
		var observedTmStatefulSet = observed.tmStatefulSet
		tmStatus := &status.Components.TaskManager
		if !isComponentUpdated(observedTmStatefulSet, observed.cluster) && shouldUpdateCluster(observed) {
			*tmStatus = new(v1beta1.TaskManagerStatus)
			recorded.Components.TaskManager.DeepCopyInto(*tmStatus)
			(*tmStatus).State = v1beta1.ComponentStateUpdating
		} else if observedTmStatefulSet != nil {
			*tmStatus = &v1beta1.TaskManagerStatus{
				Name:          observedTmStatefulSet.Name,
				State:         getStatefulSetState(observedTmStatefulSet),
				Replicas:      observedTmStatefulSet.Status.Replicas,
				ReadyReplicas: observedTmStatefulSet.Status.ReadyReplicas,
				Ready:         fmt.Sprintf("%d/%d", observedTmStatefulSet.Status.ReadyReplicas, observedTmStatefulSet.Status.Replicas),
				Selector:      labelSelector.String(),
			}
			if (*tmStatus).State == v1beta1.ComponentStateReady {
				runningComponents++
			}
		} else if recorded.Components.TaskManager != nil {
			*tmStatus = &v1beta1.TaskManagerStatus{
				Name:  recorded.Components.TaskManager.Name,
				State: v1beta1.ComponentStateDeleted,
			}
		}
	} else {
		// TaskManager Deployment.
		var observedTmDeployment = observed.tmDeployment
		tmStatus := &status.Components.TaskManager
		if !isComponentUpdated(observedTmDeployment, observed.cluster) && shouldUpdateCluster(observed) {
			*tmStatus = new(v1beta1.TaskManagerStatus)
			recorded.Components.TaskManager.DeepCopyInto(*tmStatus)
			(*tmStatus).State = v1beta1.ComponentStateUpdating
		} else if observedTmDeployment != nil {
			*tmStatus = &v1beta1.TaskManagerStatus{
				Name:          observedTmDeployment.Name,
				State:         getDeploymentState(observedTmDeployment),
				Replicas:      observedTmDeployment.Status.Replicas,
				ReadyReplicas: observedTmDeployment.Status.ReadyReplicas,
				Ready:         fmt.Sprintf("%d/%d", observedTmDeployment.Status.ReadyReplicas, observedTmDeployment.Status.Replicas),
				Selector:      labelSelector.String(),
			}
			if (*tmStatus).State == v1beta1.ComponentStateReady {
				runningComponents++
			}
		} else if recorded.Components.TaskManager != nil {
			*tmStatus = &v1beta1.TaskManagerStatus{
				Name:  recorded.Components.TaskManager.Name,
				State: v1beta1.ComponentStateDeleted,
			}
		}
	}

	// Derive the new cluster state.
	var jobStatus = recorded.Components.Job
	switch recorded.State {
	case "", v1beta1.ClusterStateCreating:
		if runningComponents < totalComponents {
			status.State = v1beta1.ClusterStateCreating
			if jobStatus.IsStopped() {
				var policy = observed.cluster.Spec.Job.CleanupPolicy
				if jobStatus.State == v1beta1.JobStateSucceeded &&
					policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster {
					status.State = v1beta1.ClusterStateStopping
				} else if jobStatus.IsFailed() &&
					policy.AfterJobFails != v1beta1.CleanupActionKeepCluster {
					status.State = v1beta1.ClusterStateStopping
				} else if jobStatus.State == v1beta1.JobStateCancelled &&
					policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster {
					status.State = v1beta1.ClusterStateStopping
				}
			}
		} else {
			status.State = v1beta1.ClusterStateRunning
		}
	case v1beta1.ClusterStateUpdating:
		if shouldUpdateCluster(observed) {
			status.State = v1beta1.ClusterStateUpdating
		} else if runningComponents < totalComponents {
			if recorded.Revision.IsUpdateTriggered() {
				status.State = v1beta1.ClusterStateUpdating
			} else {
				status.State = v1beta1.ClusterStateReconciling
			}
		} else {
			status.State = v1beta1.ClusterStateRunning
		}
	case v1beta1.ClusterStateRunning,
		v1beta1.ClusterStateReconciling:
		if shouldUpdateCluster(observed) {
			status.State = v1beta1.ClusterStateUpdating
		} else if !recorded.Revision.IsUpdateTriggered() && jobStatus.IsStopped() {
			var policy = observed.cluster.Spec.Job.CleanupPolicy
			if jobStatus.State == v1beta1.JobStateSucceeded &&
				policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster {
				status.State = v1beta1.ClusterStateStopping
			} else if jobStatus.IsFailed() &&
				policy.AfterJobFails != v1beta1.CleanupActionKeepCluster {
				status.State = v1beta1.ClusterStateStopping
			} else if jobStatus.State == v1beta1.JobStateCancelled &&
				policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster {
				status.State = v1beta1.ClusterStateStopping
			} else {
				status.State = v1beta1.ClusterStateRunning
			}
		} else if runningComponents < totalComponents {
			status.State = v1beta1.ClusterStateReconciling
		} else {
			status.State = v1beta1.ClusterStateRunning
		}
	case v1beta1.ClusterStateStopping,
		v1beta1.ClusterStatePartiallyStopped:
		if shouldUpdateCluster(observed) {
			status.State = v1beta1.ClusterStateUpdating
		} else if jobStatus.IsActive() {
			status.State = v1beta1.ClusterStateRunning
		} else if runningComponents == 0 {
			status.State = v1beta1.ClusterStateStopped
		} else if runningComponents < totalComponents {
			status.State = v1beta1.ClusterStatePartiallyStopped
		} else {
			status.State = v1beta1.ClusterStateStopping
		}
	case v1beta1.ClusterStateStopped:
		if recorded.Revision.IsUpdateTriggered() {
			status.State = v1beta1.ClusterStateUpdating
		} else {
			status.State = v1beta1.ClusterStateStopped
		}
	default:
		panic(fmt.Sprintf("Unknown cluster state: %v", recorded.State))
	}

	// (Optional) Job.
	// Update job status.
	status.Components.Job = updater.deriveJobStatus(ctx)

	// (Optional) Savepoint.
	// Update savepoint status if it is in progress or requested.
	var newJobStatus = status.Components.Job
	status.Savepoint = updater.deriveSavepointStatus(
		&observed.savepoint,
		recorded.Savepoint,
		newJobStatus,
		updater.getFlinkJobID())

	// (Optional) Control.
	// Update user requested control status.
	status.Control = deriveControlStatus(
		observed.cluster,
		status.Savepoint,
		status.Components.Job,
		recorded.Control)

	// Update revision status.
	// When update completed, finish the process by marking CurrentRevision to NextRevision.
	status.Revision = deriveRevisionStatus(
		observed.updateState,
		&observed.revision,
		&recorded.Revision)

	return status
}