in controllers/flinkcluster/flinkcluster_controller.go [138:266]
func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
request ctrl.Request) (ctrl.Result, error) {
var k8sClient = handler.k8sClient
var flinkClient = handler.flinkClient
var log = logr.FromContextOrDiscard(ctx)
var observed = &handler.observed
var desired = &handler.desired
var statusChanged bool
var err error
// History interface
var history = history.NewHistory(k8sClient, ctx)
log.Info("============================================================")
log.Info("---------- 1. Observe the current state ----------")
var observer = ClusterStateObserver{
k8sClient: k8sClient,
k8sClientset: handler.k8sClientset,
flinkClient: flinkClient,
request: request,
recorder: handler.eventRecorder,
history: history,
}
err = observer.observe(ctx, observed)
if err != nil {
log.Error(err, "Failed to observe the current state")
return ctrl.Result{}, err
}
// Sync history and observe revision status
err = observer.syncRevisionStatus(observed)
if err != nil {
log.Error(err, "Failed to sync flinkCluster history")
return ctrl.Result{}, err
}
log.Info("---------- 2. Update cluster status ----------")
var updater = ClusterStatusUpdater{
k8sClient: k8sClient,
recorder: handler.eventRecorder,
observed: handler.observed,
}
statusChanged, err = updater.updateStatusIfChanged(ctx)
if err != nil {
log.Error(err, "Failed to update cluster status")
return ctrl.Result{}, err
}
if statusChanged {
log.Info(
"Wait status to be stable before taking further actions.",
"requeueAfter",
5)
return ctrl.Result{
Requeue: true, RequeueAfter: 5 * time.Second,
}, nil
}
log.Info("---------- 3. Compute the desired state ----------")
*desired = *getDesiredClusterState(observed)
if desired.ConfigMap != nil {
log = log.WithValues("ConfigMap", *desired.ConfigMap)
} else {
log = log.WithValues("ConfigMap", "nil")
}
if desired.PodDisruptionBudget != nil {
log = log.WithValues("PodDisruptionBudget", *desired.PodDisruptionBudget)
} else {
log = log.WithValues("PodDisruptionBudget", "nil")
}
if desired.TmService != nil {
log = log.WithValues("TaskManager Service", *desired.TmService)
} else {
log = log.WithValues("TaskManager Service", "nil")
}
if desired.JmStatefulSet != nil {
log = log.WithValues("JobManager StatefulSet", *desired.JmStatefulSet)
} else {
log = log.WithValues("JobManager StatefulSet", "nil")
}
if desired.JmService != nil {
log = log.WithValues("JobManager service", *desired.JmService)
} else {
log = log.WithValues("JobManager service", "nil")
}
if desired.JmIngress != nil {
log = log.WithValues("JobManager ingress", *desired.JmIngress)
} else {
log = log.WithValues("JobManager ingress", "nil")
}
if desired.TmStatefulSet != nil {
log = log.WithValues("TaskManager StatefulSet", *desired.TmStatefulSet)
} else if desired.TmDeployment != nil {
log = log.WithValues("TaskManager Deployment", *desired.TmDeployment)
} else {
log = log.WithValues("TaskManager", "nil")
}
if desired.HorizontalPodAutoscaler != nil {
log = log.WithValues("HorizontalPodAutoscaler", *desired.HorizontalPodAutoscaler)
} else {
log = log.WithValues("HorizontalPodAutoscaler", "nil")
}
if desired.Job != nil {
log = log.WithValues("Job", *desired.Job)
} else {
log = log.WithValues("Job", "nil")
}
log.Info("Desired state")
log.Info("---------- 4. Take actions ----------")
var reconciler = ClusterReconciler{
k8sClient: k8sClient,
flinkClient: flinkClient,
observed: handler.observed,
desired: handler.desired,
recorder: handler.eventRecorder,
}
result, err := reconciler.reconcile(ctx)
if err != nil {
log.Error(err, "Failed to reconcile")
}
if result.RequeueAfter > 0 {
log.Info("Requeue reconcile request", "after", result.RequeueAfter)
}
return result, err
}