in main.go [65:122]
func main() {
opts := zap.Options{Development: true}
opts.BindFlags(flag.CommandLine)
flag.Parse()
logger := zap.New(zap.UseFlagOptions(&opts)).
WithName("controllers").
WithName("FlinkCluster")
ctrl.SetLogger(logger)
defaultNamespaces := make(map[string]cache.Config)
if *watchNamespace != "" {
setupLog.Info("Watching custom resources in the namespace", "namespace", *watchNamespace)
defaultNamespaces[*watchNamespace] = cache.Config{}
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: *metricsAddr},
LeaderElection: *enableLeaderElection,
Cache: cache.Options{
DefaultNamespaces: defaultNamespaces,
},
LeaderElectionID: *leaderElectionID,
})
if err != nil {
setupLog.Error(err, "Unable to start manager")
os.Exit(1)
}
reconciler, err := flinkcluster.NewReconciler(mgr)
if err != nil {
setupLog.Error(err, "Unable to create reconciler")
os.Exit(1)
}
err = reconciler.SetupWithManager(mgr, *maxConcurrentReconciles)
if err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "FlinkCluster")
os.Exit(1)
}
// Set up webhooks for the custom resource.
// Disable it with `FLINK_OPERATOR_ENABLE_WEBHOOKS=false` when we run locally.
if os.Getenv("FLINK_OPERATOR_ENABLE_WEBHOOKS") != "false" {
if err = (&v1beta1.FlinkCluster{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to setup webhooks", "webhook", "FlinkCluster")
os.Exit(1)
}
}
// +kubebuilder:scaffold:builder
setupLog.Info("Starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "Problem running manager")
os.Exit(1)
}
}