func()

in controllers/flinkcluster/flinkcluster_controller.go [138:266]


func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
	request ctrl.Request) (ctrl.Result, error) {
	var k8sClient = handler.k8sClient
	var flinkClient = handler.flinkClient
	var log = logr.FromContextOrDiscard(ctx)
	var observed = &handler.observed
	var desired = &handler.desired
	var statusChanged bool
	var err error
	// History interface
	var history = history.NewHistory(k8sClient, ctx)
	log.Info("============================================================")
	log.Info("---------- 1. Observe the current state ----------")

	var observer = ClusterStateObserver{
		k8sClient:    k8sClient,
		k8sClientset: handler.k8sClientset,
		flinkClient:  flinkClient,
		request:      request,
		recorder:     handler.eventRecorder,
		history:      history,
	}
	err = observer.observe(ctx, observed)
	if err != nil {
		log.Error(err, "Failed to observe the current state")
		return ctrl.Result{}, err
	}

	// Sync history and observe revision status
	err = observer.syncRevisionStatus(observed)
	if err != nil {
		log.Error(err, "Failed to sync flinkCluster history")
		return ctrl.Result{}, err
	}

	log.Info("---------- 2. Update cluster status ----------")

	var updater = ClusterStatusUpdater{
		k8sClient: k8sClient,
		recorder:  handler.eventRecorder,
		observed:  handler.observed,
	}
	statusChanged, err = updater.updateStatusIfChanged(ctx)
	if err != nil {
		log.Error(err, "Failed to update cluster status")
		return ctrl.Result{}, err
	}
	if statusChanged {
		log.Info(
			"Wait status to be stable before taking further actions.",
			"requeueAfter",
			5)
		return ctrl.Result{
			Requeue: true, RequeueAfter: 5 * time.Second,
		}, nil
	}

	log.Info("---------- 3. Compute the desired state ----------")

	*desired = *getDesiredClusterState(observed)
	if desired.ConfigMap != nil {
		log = log.WithValues("ConfigMap", *desired.ConfigMap)
	} else {
		log = log.WithValues("ConfigMap", "nil")
	}
	if desired.PodDisruptionBudget != nil {
		log = log.WithValues("PodDisruptionBudget", *desired.PodDisruptionBudget)
	} else {
		log = log.WithValues("PodDisruptionBudget", "nil")
	}
	if desired.TmService != nil {
		log = log.WithValues("TaskManager Service", *desired.TmService)
	} else {
		log = log.WithValues("TaskManager Service", "nil")
	}
	if desired.JmStatefulSet != nil {
		log = log.WithValues("JobManager StatefulSet", *desired.JmStatefulSet)
	} else {
		log = log.WithValues("JobManager StatefulSet", "nil")
	}
	if desired.JmService != nil {
		log = log.WithValues("JobManager service", *desired.JmService)
	} else {
		log = log.WithValues("JobManager service", "nil")
	}
	if desired.JmIngress != nil {
		log = log.WithValues("JobManager ingress", *desired.JmIngress)
	} else {
		log = log.WithValues("JobManager ingress", "nil")
	}
	if desired.TmStatefulSet != nil {
		log = log.WithValues("TaskManager StatefulSet", *desired.TmStatefulSet)
	} else if desired.TmDeployment != nil {
		log = log.WithValues("TaskManager Deployment", *desired.TmDeployment)
	} else {
		log = log.WithValues("TaskManager", "nil")
	}
	if desired.HorizontalPodAutoscaler != nil {
		log = log.WithValues("HorizontalPodAutoscaler", *desired.HorizontalPodAutoscaler)
	} else {
		log = log.WithValues("HorizontalPodAutoscaler", "nil")
	}

	if desired.Job != nil {
		log = log.WithValues("Job", *desired.Job)
	} else {
		log = log.WithValues("Job", "nil")
	}
	log.Info("Desired state")

	log.Info("---------- 4. Take actions ----------")

	var reconciler = ClusterReconciler{
		k8sClient:   k8sClient,
		flinkClient: flinkClient,
		observed:    handler.observed,
		desired:     handler.desired,
		recorder:    handler.eventRecorder,
	}
	result, err := reconciler.reconcile(ctx)
	if err != nil {
		log.Error(err, "Failed to reconcile")
	}
	if result.RequeueAfter > 0 {
		log.Info("Requeue reconcile request", "after", result.RequeueAfter)
	}

	return result, err
}