in controllers/flinkcluster/flinkcluster_updater.go [207:566]
func (updater *ClusterStatusUpdater) deriveClusterStatus(
ctx context.Context,
cluster *v1beta1.FlinkCluster,
observed *ObservedClusterState) v1beta1.FlinkClusterStatus {
var totalComponents int
if IsApplicationModeCluster(cluster) {
// jmService, tmStatefulSet.
totalComponents = 2
} else {
// jmStatefulSet, jmService, tmStatefulSet.
totalComponents = 3
}
var recorded = cluster.Status
var status = v1beta1.FlinkClusterStatus{}
var runningComponents = 0
// ConfigMap.
var observedConfigMap = observed.configMap
cmStatus := &status.Components.ConfigMap
if !isComponentUpdated(observedConfigMap, observed.cluster) && shouldUpdateCluster(observed) {
*cmStatus = new(v1beta1.ConfigMapStatus)
recorded.Components.ConfigMap.DeepCopyInto(*cmStatus)
(*cmStatus).State = v1beta1.ComponentStateUpdating
} else if observedConfigMap != nil {
*cmStatus = &v1beta1.ConfigMapStatus{
Name: observedConfigMap.Name,
State: v1beta1.ComponentStateReady,
}
} else if recorded.Components.ConfigMap != nil {
*cmStatus = &v1beta1.ConfigMapStatus{
Name: recorded.Components.ConfigMap.Name,
State: v1beta1.ComponentStateDeleted,
}
}
// JobManager StatefulSet.
var observedJmStatefulSet = observed.jmStatefulSet
jmStatus := &status.Components.JobManager
if !IsApplicationModeCluster(cluster) {
if !isComponentUpdated(observedJmStatefulSet, observed.cluster) && shouldUpdateCluster(observed) {
*jmStatus = new(v1beta1.JobManagerStatus)
recorded.Components.JobManager.DeepCopyInto(*jmStatus)
(*jmStatus).State = v1beta1.ComponentStateUpdating
} else if observedJmStatefulSet != nil {
*jmStatus = &v1beta1.JobManagerStatus{
Name: observedJmStatefulSet.Name,
State: getStatefulSetState(observedJmStatefulSet),
Replicas: observedJmStatefulSet.Status.Replicas,
ReadyReplicas: observedJmStatefulSet.Status.ReadyReplicas,
Ready: fmt.Sprintf("%d/%d", observedJmStatefulSet.Status.ReadyReplicas, observedJmStatefulSet.Status.Replicas),
}
if (*jmStatus).State == v1beta1.ComponentStateReady {
runningComponents++
}
} else if recorded.Components.JobManager != nil {
*jmStatus = &v1beta1.JobManagerStatus{
Name: recorded.Components.JobManager.Name,
State: v1beta1.ComponentStateDeleted,
}
}
}
// JobManager service.
var observedJmService = observed.jmService
if !isComponentUpdated(observedJmService, observed.cluster) && shouldUpdateCluster(observed) {
recorded.Components.JobManagerService.DeepCopyInto(&status.Components.JobManagerService)
status.Components.JobManagerService.State = v1beta1.ComponentStateUpdating
} else if observedJmService != nil {
var nodePort int32
var loadBalancerIngress []corev1.LoadBalancerIngress
state := v1beta1.ComponentStateNotReady
switch observedJmService.Spec.Type {
case corev1.ServiceTypeClusterIP:
if observedJmService.Spec.ClusterIP != "" {
state = v1beta1.ComponentStateReady
runningComponents++
}
case corev1.ServiceTypeLoadBalancer:
if len(observedJmService.Status.LoadBalancer.Ingress) > 0 {
state = v1beta1.ComponentStateReady
runningComponents++
loadBalancerIngress = observedJmService.Status.LoadBalancer.Ingress
}
case corev1.ServiceTypeNodePort:
if len(observedJmService.Spec.Ports) > 0 {
state = v1beta1.ComponentStateReady
runningComponents++
for _, port := range observedJmService.Spec.Ports {
if port.Name == "ui" {
nodePort = port.NodePort
}
}
}
}
status.Components.JobManagerService =
v1beta1.JobManagerServiceStatus{
Name: observedJmService.Name,
State: state,
NodePort: nodePort,
LoadBalancerIngress: loadBalancerIngress,
}
} else if recorded.Components.JobManagerService.Name != "" {
status.Components.JobManagerService =
v1beta1.JobManagerServiceStatus{
Name: recorded.Components.JobManagerService.Name,
State: v1beta1.ComponentStateDeleted,
}
}
// (Optional) JobManager ingress.
var observedJmIngress = observed.jmIngress
if !isComponentUpdated(observedJmIngress, observed.cluster) && shouldUpdateCluster(observed) {
status.Components.JobManagerIngress = &v1beta1.JobManagerIngressStatus{}
recorded.Components.JobManagerIngress.DeepCopyInto(status.Components.JobManagerIngress)
status.Components.JobManagerIngress.State = v1beta1.ComponentStateUpdating
} else if observedJmIngress != nil {
var state v1beta1.ComponentState
var urls []string
var useTLS bool
var useHost bool
var loadbalancerReady bool
if len(observedJmIngress.Spec.TLS) > 0 {
useTLS = true
}
if useTLS {
for _, tls := range observedJmIngress.Spec.TLS {
for _, host := range tls.Hosts {
if host != "" {
urls = append(urls, "https://"+host)
}
}
}
} else {
for _, rule := range observedJmIngress.Spec.Rules {
if rule.Host != "" {
urls = append(urls, "http://"+rule.Host)
}
}
}
if len(urls) > 0 {
useHost = true
}
// Check loadbalancer is ready.
if len(observedJmIngress.Status.LoadBalancer.Ingress) > 0 {
var addr string
for _, ingress := range observedJmIngress.Status.LoadBalancer.Ingress {
// Get loadbalancer address.
if ingress.Hostname != "" {
addr = ingress.Hostname
} else if ingress.IP != "" {
addr = ingress.IP
}
// If ingress spec does not have host, get ip or hostname of loadbalancer.
if !useHost && addr != "" {
if useTLS {
urls = append(urls, "https://"+addr)
} else {
urls = append(urls, "http://"+addr)
}
}
}
// If any ready LB found, state is ready.
if addr != "" {
loadbalancerReady = true
}
}
// Jobmanager ingress state become ready when LB for ingress is specified.
if loadbalancerReady {
state = v1beta1.ComponentStateReady
} else {
state = v1beta1.ComponentStateNotReady
}
status.Components.JobManagerIngress =
&v1beta1.JobManagerIngressStatus{
Name: observedJmIngress.Name,
State: state,
URLs: urls,
}
} else if recorded.Components.JobManagerIngress != nil &&
recorded.Components.JobManagerIngress.Name != "" {
status.Components.JobManagerIngress =
&v1beta1.JobManagerIngressStatus{
Name: recorded.Components.JobManagerIngress.Name,
State: v1beta1.ComponentStateDeleted,
}
}
labelSelector := labels.SelectorFromSet(getComponentLabels(cluster, "taskmanager"))
var clusterTmDeploymentType = cluster.Spec.TaskManager.DeploymentType
if clusterTmDeploymentType == "" || clusterTmDeploymentType == v1beta1.DeploymentTypeStatefulSet {
// TaskManager StatefulSet.
var observedTmStatefulSet = observed.tmStatefulSet
tmStatus := &status.Components.TaskManager
if !isComponentUpdated(observedTmStatefulSet, observed.cluster) && shouldUpdateCluster(observed) {
*tmStatus = new(v1beta1.TaskManagerStatus)
recorded.Components.TaskManager.DeepCopyInto(*tmStatus)
(*tmStatus).State = v1beta1.ComponentStateUpdating
} else if observedTmStatefulSet != nil {
*tmStatus = &v1beta1.TaskManagerStatus{
Name: observedTmStatefulSet.Name,
State: getStatefulSetState(observedTmStatefulSet),
Replicas: observedTmStatefulSet.Status.Replicas,
ReadyReplicas: observedTmStatefulSet.Status.ReadyReplicas,
Ready: fmt.Sprintf("%d/%d", observedTmStatefulSet.Status.ReadyReplicas, observedTmStatefulSet.Status.Replicas),
Selector: labelSelector.String(),
}
if (*tmStatus).State == v1beta1.ComponentStateReady {
runningComponents++
}
} else if recorded.Components.TaskManager != nil {
*tmStatus = &v1beta1.TaskManagerStatus{
Name: recorded.Components.TaskManager.Name,
State: v1beta1.ComponentStateDeleted,
}
}
} else {
// TaskManager Deployment.
var observedTmDeployment = observed.tmDeployment
tmStatus := &status.Components.TaskManager
if !isComponentUpdated(observedTmDeployment, observed.cluster) && shouldUpdateCluster(observed) {
*tmStatus = new(v1beta1.TaskManagerStatus)
recorded.Components.TaskManager.DeepCopyInto(*tmStatus)
(*tmStatus).State = v1beta1.ComponentStateUpdating
} else if observedTmDeployment != nil {
*tmStatus = &v1beta1.TaskManagerStatus{
Name: observedTmDeployment.Name,
State: getDeploymentState(observedTmDeployment),
Replicas: observedTmDeployment.Status.Replicas,
ReadyReplicas: observedTmDeployment.Status.ReadyReplicas,
Ready: fmt.Sprintf("%d/%d", observedTmDeployment.Status.ReadyReplicas, observedTmDeployment.Status.Replicas),
Selector: labelSelector.String(),
}
if (*tmStatus).State == v1beta1.ComponentStateReady {
runningComponents++
}
} else if recorded.Components.TaskManager != nil {
*tmStatus = &v1beta1.TaskManagerStatus{
Name: recorded.Components.TaskManager.Name,
State: v1beta1.ComponentStateDeleted,
}
}
}
// Derive the new cluster state.
var jobStatus = recorded.Components.Job
switch recorded.State {
case "", v1beta1.ClusterStateCreating:
if runningComponents < totalComponents {
status.State = v1beta1.ClusterStateCreating
if jobStatus.IsStopped() {
var policy = observed.cluster.Spec.Job.CleanupPolicy
if jobStatus.State == v1beta1.JobStateSucceeded &&
policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster {
status.State = v1beta1.ClusterStateStopping
} else if jobStatus.IsFailed() &&
policy.AfterJobFails != v1beta1.CleanupActionKeepCluster {
status.State = v1beta1.ClusterStateStopping
} else if jobStatus.State == v1beta1.JobStateCancelled &&
policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster {
status.State = v1beta1.ClusterStateStopping
}
}
} else {
status.State = v1beta1.ClusterStateRunning
}
case v1beta1.ClusterStateUpdating:
if shouldUpdateCluster(observed) {
status.State = v1beta1.ClusterStateUpdating
} else if runningComponents < totalComponents {
if recorded.Revision.IsUpdateTriggered() {
status.State = v1beta1.ClusterStateUpdating
} else {
status.State = v1beta1.ClusterStateReconciling
}
} else {
status.State = v1beta1.ClusterStateRunning
}
case v1beta1.ClusterStateRunning,
v1beta1.ClusterStateReconciling:
if shouldUpdateCluster(observed) {
status.State = v1beta1.ClusterStateUpdating
} else if !recorded.Revision.IsUpdateTriggered() && jobStatus.IsStopped() {
var policy = observed.cluster.Spec.Job.CleanupPolicy
if jobStatus.State == v1beta1.JobStateSucceeded &&
policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster {
status.State = v1beta1.ClusterStateStopping
} else if jobStatus.IsFailed() &&
policy.AfterJobFails != v1beta1.CleanupActionKeepCluster {
status.State = v1beta1.ClusterStateStopping
} else if jobStatus.State == v1beta1.JobStateCancelled &&
policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster {
status.State = v1beta1.ClusterStateStopping
} else {
status.State = v1beta1.ClusterStateRunning
}
} else if runningComponents < totalComponents {
status.State = v1beta1.ClusterStateReconciling
} else {
status.State = v1beta1.ClusterStateRunning
}
case v1beta1.ClusterStateStopping,
v1beta1.ClusterStatePartiallyStopped:
if shouldUpdateCluster(observed) {
status.State = v1beta1.ClusterStateUpdating
} else if jobStatus.IsActive() {
status.State = v1beta1.ClusterStateRunning
} else if runningComponents == 0 {
status.State = v1beta1.ClusterStateStopped
} else if runningComponents < totalComponents {
status.State = v1beta1.ClusterStatePartiallyStopped
} else {
status.State = v1beta1.ClusterStateStopping
}
case v1beta1.ClusterStateStopped:
if recorded.Revision.IsUpdateTriggered() {
status.State = v1beta1.ClusterStateUpdating
} else {
status.State = v1beta1.ClusterStateStopped
}
default:
panic(fmt.Sprintf("Unknown cluster state: %v", recorded.State))
}
// (Optional) Job.
// Update job status.
status.Components.Job = updater.deriveJobStatus(ctx)
// (Optional) Savepoint.
// Update savepoint status if it is in progress or requested.
var newJobStatus = status.Components.Job
status.Savepoint = updater.deriveSavepointStatus(
&observed.savepoint,
recorded.Savepoint,
newJobStatus,
updater.getFlinkJobID())
// (Optional) Control.
// Update user requested control status.
status.Control = deriveControlStatus(
observed.cluster,
status.Savepoint,
status.Components.Job,
recorded.Control)
// Update revision status.
// When update completed, finish the process by marking CurrentRevision to NextRevision.
status.Revision = deriveRevisionStatus(
observed.updateState,
&observed.revision,
&recorded.Revision)
return status
}