controllers/flinkcluster/flinkcluster_controller.go (198 lines of code) (raw):
/*
Copyright 2019 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flinkcluster
import (
"context"
"os"
"time"
"github.com/spotify/flink-on-k8s-operator/internal/controllers/history"
"github.com/spotify/flink-on-k8s-operator/internal/flink"
"github.com/go-logr/logr"
v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/model"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1beta1.GroupVersion.WithKind("FlinkCluster")
// FlinkClusterReconciler reconciles a FlinkCluster object
type FlinkClusterReconciler struct {
Client client.Client
Clientset *kubernetes.Clientset
EventRecorder record.EventRecorder
Sharder *Sharder
}
func NewReconciler(mgr manager.Manager) (*FlinkClusterReconciler, error) {
cs, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return nil, err
}
sh, err := NewSharderFromEnv(os.Getenv("total-shards"), os.Getenv("pod-name"))
if err != nil {
return nil, err
}
return &FlinkClusterReconciler{
Client: mgr.GetClient(),
Clientset: cs,
EventRecorder: mgr.GetEventRecorderFor("FlinkOperator"),
Sharder: sh,
}, nil
}
// +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services/status,verbs=get
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events/status,verbs=get
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get
// +kubebuilder:rbac:groups=networking,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=networking,resources=ingresses/status,verbs=get
// Reconcile the observed state towards the desired state for a FlinkCluster custom resource.
func (r *FlinkClusterReconciler) Reconcile(ctx context.Context,
request ctrl.Request) (ctrl.Result, error) {
log := logr.FromContextOrDiscard(ctx)
if !r.Sharder.IsOwnedByMe(request.Namespace, request.Name) {
log.Info("Not owned by shard, skipping", "namespace", request.Namespace, "name", request.Name)
return ctrl.Result{}, nil
}
var handler = FlinkClusterHandler{
k8sClient: r.Client,
k8sClientset: r.Clientset,
flinkClient: flink.NewDefaultClient(log),
request: request,
eventRecorder: r.EventRecorder,
observed: ObservedClusterState{},
}
return handler.reconcile(logr.NewContext(ctx, log), request)
}
// SetupWithManager registers this reconciler with the controller manager and
// starts watching FlinkCluster, Deployment and Service resources.
func (reconciler *FlinkClusterReconciler) SetupWithManager(
mgr ctrl.Manager,
maxConcurrentReconciles int) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(ctrlcontroller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
For(&v1beta1.FlinkCluster{}).
Owns(&appsv1.Deployment{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&batchv1.Job{}).
Complete(reconciler)
}
// FlinkClusterHandler holds the context and state for a
// reconcile request.
type FlinkClusterHandler struct {
k8sClient client.Client
k8sClientset *kubernetes.Clientset
flinkClient *flink.Client
request ctrl.Request
eventRecorder record.EventRecorder
observed ObservedClusterState
desired model.DesiredClusterState
}
func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
request ctrl.Request) (ctrl.Result, error) {
var k8sClient = handler.k8sClient
var flinkClient = handler.flinkClient
var log = logr.FromContextOrDiscard(ctx)
var observed = &handler.observed
var desired = &handler.desired
var statusChanged bool
var err error
// History interface
var history = history.NewHistory(k8sClient, ctx)
log.Info("============================================================")
log.Info("---------- 1. Observe the current state ----------")
var observer = ClusterStateObserver{
k8sClient: k8sClient,
k8sClientset: handler.k8sClientset,
flinkClient: flinkClient,
request: request,
recorder: handler.eventRecorder,
history: history,
}
err = observer.observe(ctx, observed)
if err != nil {
log.Error(err, "Failed to observe the current state")
return ctrl.Result{}, err
}
// Sync history and observe revision status
err = observer.syncRevisionStatus(observed)
if err != nil {
log.Error(err, "Failed to sync flinkCluster history")
return ctrl.Result{}, err
}
log.Info("---------- 2. Update cluster status ----------")
var updater = ClusterStatusUpdater{
k8sClient: k8sClient,
recorder: handler.eventRecorder,
observed: handler.observed,
}
statusChanged, err = updater.updateStatusIfChanged(ctx)
if err != nil {
log.Error(err, "Failed to update cluster status")
return ctrl.Result{}, err
}
if statusChanged {
log.Info(
"Wait status to be stable before taking further actions.",
"requeueAfter",
5)
return ctrl.Result{
Requeue: true, RequeueAfter: 5 * time.Second,
}, nil
}
log.Info("---------- 3. Compute the desired state ----------")
*desired = *getDesiredClusterState(observed)
if desired.ConfigMap != nil {
log = log.WithValues("ConfigMap", *desired.ConfigMap)
} else {
log = log.WithValues("ConfigMap", "nil")
}
if desired.PodDisruptionBudget != nil {
log = log.WithValues("PodDisruptionBudget", *desired.PodDisruptionBudget)
} else {
log = log.WithValues("PodDisruptionBudget", "nil")
}
if desired.TmService != nil {
log = log.WithValues("TaskManager Service", *desired.TmService)
} else {
log = log.WithValues("TaskManager Service", "nil")
}
if desired.JmStatefulSet != nil {
log = log.WithValues("JobManager StatefulSet", *desired.JmStatefulSet)
} else {
log = log.WithValues("JobManager StatefulSet", "nil")
}
if desired.JmService != nil {
log = log.WithValues("JobManager service", *desired.JmService)
} else {
log = log.WithValues("JobManager service", "nil")
}
if desired.JmIngress != nil {
log = log.WithValues("JobManager ingress", *desired.JmIngress)
} else {
log = log.WithValues("JobManager ingress", "nil")
}
if desired.TmStatefulSet != nil {
log = log.WithValues("TaskManager StatefulSet", *desired.TmStatefulSet)
} else if desired.TmDeployment != nil {
log = log.WithValues("TaskManager Deployment", *desired.TmDeployment)
} else {
log = log.WithValues("TaskManager", "nil")
}
if desired.HorizontalPodAutoscaler != nil {
log = log.WithValues("HorizontalPodAutoscaler", *desired.HorizontalPodAutoscaler)
} else {
log = log.WithValues("HorizontalPodAutoscaler", "nil")
}
if desired.Job != nil {
log = log.WithValues("Job", *desired.Job)
} else {
log = log.WithValues("Job", "nil")
}
log.Info("Desired state")
log.Info("---------- 4. Take actions ----------")
var reconciler = ClusterReconciler{
k8sClient: k8sClient,
flinkClient: flinkClient,
observed: handler.observed,
desired: handler.desired,
recorder: handler.eventRecorder,
}
result, err := reconciler.reconcile(ctx)
if err != nil {
log.Error(err, "Failed to reconcile")
}
if result.RequeueAfter > 0 {
log.Info("Requeue reconcile request", "after", result.RequeueAfter)
}
return result, err
}