controller/codebase/controller.go (394 lines of code) (raw):
package codebase
import (
"context"
"ddm-admin-console/app/registry"
"ddm-admin-console/config"
"ddm-admin-console/controller"
"ddm-admin-console/service"
codebaseService "ddm-admin-console/service/codebase"
gerritService "ddm-admin-console/service/gerrit"
"ddm-admin-console/service/git"
"encoding/json"
"errors"
"fmt"
"os"
"path"
"reflect"
"time"
"github.com/patrickmn/go-cache"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
registryRemoteName = "registry"
keySecretIndex = "id_rsa"
)
const DefaultRetryTimeout = time.Second * 15
type Controller struct {
logger controller.Logger
mgr ctrl.Manager
k8sClient client.Client
cnf *config.Settings
appCache *cache.Cache
versionFilter *registry.VersionFilter
gerrit gerritService.ServiceInterface
codebase codebaseService.ServiceInterface
}
type AdminSyncer interface {
SyncAdmins(ctx context.Context, registryName string, admins []registry.Admin) error
}
func Make(mgr ctrl.Manager, logger controller.Logger, cnf *config.Settings, _c *cache.Cache,
gerrit gerritService.ServiceInterface, cbService codebaseService.ServiceInterface) error {
c := Controller{
mgr: mgr,
logger: logger,
k8sClient: mgr.GetClient(),
cnf: cnf,
appCache: _c,
gerrit: gerrit,
codebase: cbService,
}
vf, err := registry.MakeVersionFilter(cnf.RegistryVersionFilter)
if err != nil {
return fmt.Errorf("unable to init version filter, %w", err)
}
c.versionFilter = vf
if err := ctrl.NewControllerManagedBy(mgr).
For(&codebaseService.Codebase{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: isSpecUpdated})).
Complete(&c); err != nil {
return fmt.Errorf("unable to create controller, %w", err)
}
return nil
}
func ProcessRegistryVersion(
ctx context.Context,
versionFilter *registry.VersionFilter,
cb *codebaseService.Codebase,
gr gerritService.ServiceInterface,
) (
bool,
error,
) {
cbs := []codebaseService.Codebase{*cb}
if err := registry.LoadRegistryVersions(ctx, gr, cbs); err != nil {
return false, fmt.Errorf("unable to load registry version, %w", err)
}
registryVersionCodebase := cbs[0]
if registryVersionCodebase.Version.Original() == "0" {
clusterProject, err := gr.GetProject(ctx, cb.Name)
if err != nil {
return false, fmt.Errorf("unable to get cluster gerrit project, %w", err)
}
registryVersionCodebase.Version = registry.LowestVersion(registry.UpdateBranches(clusterProject.Status.Branches))
}
return versionFilter.CheckCodebase(®istryVersionCodebase), nil
}
func GerritSSHURL(cnf *config.Settings) string {
return fmt.Sprintf("ssh://%s@%s:%s", cnf.GitUsername, cnf.GitHost, cnf.GitPort)
}
func isSpecUpdated(e event.UpdateEvent) bool {
oo := e.ObjectOld.(*codebaseService.Codebase)
no := e.ObjectNew.(*codebaseService.Codebase)
return !reflect.DeepEqual(oo.Spec, no.Spec) ||
(oo.GetDeletionTimestamp().IsZero() && !no.GetDeletionTimestamp().IsZero())
}
func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
c.logger.Infow("reconciling codebase", "Request.Namespace", request.Namespace,
"Request.Name", request.Name)
if err := c.reconcile(ctx, request); err != nil {
c.logger.Errorw(err.Error(), "Request.Namespace", request.Namespace, "Request.Name", request.Name)
if IsErrPostpone(err) {
return reconcile.Result{RequeueAfter: errors.Unwrap(err).(ErrPostpone).D()}, nil
}
return reconcile.Result{RequeueAfter: DefaultRetryTimeout}, nil
}
c.logger.Infow("reconciling done", "Request.Namespace", request.Namespace,
"Request.Name", request.Name)
return reconcile.Result{}, nil
}
func (c *Controller) reconcile(ctx context.Context, request reconcile.Request) error {
var instance codebaseService.Codebase
if err := c.k8sClient.Get(ctx, request.NamespacedName, &instance); err != nil {
if k8sErrors.IsNotFound(err) {
c.logger.Infow("instance not found", "Request.Namespace", request.Namespace, "Request.Name", request.Name)
return nil
}
return fmt.Errorf("unable to get codebase from k8s, %w", err)
}
if instance.Spec.Type != codebaseService.RegistryCodebaseType {
return nil
}
processRequest, err := ProcessRegistryVersion(ctx, c.versionFilter, &instance, c.gerrit)
if err != nil {
return fmt.Errorf("unable to p, err: %w", err)
}
if !processRequest {
c.logger.Infow("reconciling codebase skipped, wrong registry version",
"Request.Namespace", request.Namespace, "Request.Name", request.Name)
return nil
}
if err := c.updateImportRepo(ctx, &instance); err != nil {
return fmt.Errorf("unable to update import repo, %w", err)
}
if err := c.checkBranchesStatus(ctx, &instance); err != nil {
return fmt.Errorf("unable to check branches statuses, %w", err)
}
return nil
}
func (c *Controller) checkBranchesStatus(ctx context.Context, instance *codebaseService.Codebase) error {
branches, err := c.codebase.GetBranchesByCodebase(ctx, instance.Name)
if err != nil {
return fmt.Errorf("unable to get codebase branches, %w", err)
}
inactiveBranches := false
for _, b := range branches {
if b.Status.Value != codebaseService.BranchStatusActive {
inactiveBranches = true
break
}
}
if inactiveBranches {
instance.Annotations[codebaseService.StatusAnnotation] = codebaseService.StatusAnnotationInactiveBranches
if err := c.codebase.Update(ctx, instance); err != nil {
return fmt.Errorf("unable to update instance, %w", err)
}
return ErrPostpone(DefaultRetryTimeout)
}
if annotationStatus, ok := instance.Annotations[codebaseService.StatusAnnotation]; ok &&
annotationStatus == codebaseService.StatusAnnotationInactiveBranches {
delete(instance.Annotations, codebaseService.StatusAnnotation)
if err := c.codebase.Update(ctx, instance); err != nil {
return fmt.Errorf("unable to update instance, %w", err)
}
}
return nil
}
func (c *Controller) updateImportRepo(ctx context.Context, instance *codebaseService.Codebase) error {
if instance.Spec.GitUrlPath == nil || *instance.Spec.GitUrlPath != codebaseService.RepoNotReady {
return nil
}
prj, err := c.getGerritProject(ctx, instance.Name)
if service.IsErrNotFound(err) {
return ErrPostpone(DefaultRetryTimeout)
} else if err != nil {
return fmt.Errorf("unknown error, %w", err)
}
if prj.Status.Value != "OK" {
return ErrPostpone(DefaultRetryTimeout)
}
if err := c.pushRegistryTemplate(ctx, instance); err != nil {
return fmt.Errorf("unable to push registry template, %w", err)
}
gitUrlPath := fmt.Sprintf("/%s", instance.Name)
instance.Spec.GitUrlPath = &gitUrlPath
if err := c.k8sClient.Update(ctx, instance); err != nil {
return fmt.Errorf("unable to update codebase, %w", err)
}
return nil
}
func (c *Controller) pushRegistryTemplate(ctx context.Context, instance *codebaseService.Codebase) error {
reposPath, err := PrepareControllerTempFolder(c.cnf.TempFolder, "repos")
if err != nil {
return fmt.Errorf("unable to create repos folder, %w", err)
}
privateKey, err := GetGerritPrivateKey(ctx, c.k8sClient, c.cnf)
if err != nil {
return fmt.Errorf("unable to get gerrit private key, %w", err)
}
gitService := git.Make(path.Join(reposPath, instance.Name), c.cnf.GitUsername, privateKey)
defer func() {
if err := gitService.Clean(); err != nil {
c.logger.Error(err)
}
}()
if err := c.initCodebaseRepo(instance, gitService); err != nil {
return fmt.Errorf("unable to init codebase repo, %w", err)
}
_, err = gitService.Pull(registryRemoteName)
if git.IsErrReferenceNotFound(err) {
if err := c.replaceDefaultBranch(instance, gitService); err != nil {
return fmt.Errorf("unable to replace default branch, %w", err)
}
} else if git.IsErrNonFastForwardUpdate(err) {
return nil
} else if err != nil {
return fmt.Errorf("unable to pull, %w", err)
}
cachedToCommit, err := SetCachedFiles(instance.Name, c.appCache, gitService)
if err != nil {
return fmt.Errorf("unable to set cached files, %w", err)
}
valuesToCommit, err := updateRegistryValues(instance, gitService)
if err != nil {
return fmt.Errorf("unable to update registry values, %w", err)
}
if cachedToCommit || valuesToCommit {
if err := gitService.RawCommit(&git.User{Name: instance.Annotations[registry.AnnotationCreatorUsername],
Email: instance.Annotations[registry.AnnotationCreatorEmail]}, "set initial values.yaml from admin console"); err != nil {
return fmt.Errorf("unable to commit values, %w", err)
}
}
if err := gitService.Push(registryRemoteName, "--all"); err != nil {
return fmt.Errorf("unable to push [all] changes to repo registry, %w", err)
}
if err := gitService.Push(registryRemoteName, "--tags"); err != nil {
return fmt.Errorf("unable to push tags to repo registry, %w", err)
}
return nil
}
func (c *Controller) replaceDefaultBranch(instance *codebaseService.Codebase, gitService *git.Service) error {
if instance.Spec.BranchToCopyInDefaultBranch == "" {
return gitService.Checkout(instance.Spec.DefaultBranch, false)
}
if err := gitService.Checkout(instance.Spec.BranchToCopyInDefaultBranch, false); err != nil {
return fmt.Errorf("unable to checkout, %w", err)
}
if err := gitService.RemoveBranch(instance.Spec.DefaultBranch); err != nil {
return fmt.Errorf("unable to remove default branch, %w", err)
}
if err := gitService.Checkout(instance.Spec.DefaultBranch, true); err != nil {
return fmt.Errorf("unable to copy to default branch, %w", err)
}
return nil
}
func (c *Controller) initCodebaseRepo(instance *codebaseService.Codebase, gitService *git.Service) error {
gerritSSHURL := GerritSSHURL(c.cnf)
tpl, ok := instance.Annotations[registry.AnnotationTemplateName]
if !ok {
return errors.New("template annotation not found")
}
if err := gitService.Clone(fmt.Sprintf("%s/%s", gerritSSHURL, tpl)); err != nil {
return fmt.Errorf("unable to clone repo, %w", err)
}
if err := gitService.AddRemote(registryRemoteName, fmt.Sprintf("%s/%s", gerritSSHURL, instance.Name)); err != nil {
return fmt.Errorf("unable to add registry remote, %w", err)
}
return nil
}
func PrepareControllerTempFolder(tempDir, controllerFolder string) (string, error) {
controllerFolderPath := path.Join(tempDir, fmt.Sprintf("%s-%d", controllerFolder, time.Now().UnixNano()))
if _, err := os.Stat(controllerFolderPath); err == nil {
if err := os.RemoveAll(controllerFolderPath); err != nil {
return "", fmt.Errorf("unable to clear repos folder, %w", err)
}
}
if err := os.MkdirAll(controllerFolderPath, 0777); err != nil {
return "", fmt.Errorf("unable to create repo folder, %w", err)
}
return controllerFolderPath, nil
}
func GetGerritPrivateKey(ctx context.Context, k8sClient client.Client, cnf *config.Settings) (string, error) {
var gerritSecret v1.Secret
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: cnf.Namespace, Name: cnf.GitKeySecretName},
&gerritSecret); err != nil {
return "", fmt.Errorf("unable to get gerrit project creator secret, %w", err)
}
key, ok := gerritSecret.Data[keySecretIndex]
if !ok {
return "", fmt.Errorf("no data by key %s in gerrit secret", keySecretIndex)
}
return string(key), nil
}
func SetCachedFiles(projectName string, appCache *cache.Cache, gitService *git.Service) (bool, error) {
//TODO: remove cached files
key := registry.CachedFilesIndex(projectName)
files, ok := appCache.Get(key)
if !ok {
return false, nil
}
cachedFiles, ok := files.([]registry.CachedFile)
if !ok {
appCache.Delete(key)
return false, fmt.Errorf("wrong files type, %+v", files)
}
updatedFiles := 0
for _, f := range cachedFiles {
bts, err := os.ReadFile(f.TempPath)
if err != nil {
return false, fmt.Errorf("unable to read file, %w", err)
}
repoContents, err := gitService.GetFileContents(f.RepoPath)
if err == nil && repoContents == string(bts) {
continue
}
if err := gitService.SetFileContents(f.RepoPath, string(bts)); err != nil {
return false, fmt.Errorf("unable to set file contents, %w", err)
}
if err := gitService.Add(f.RepoPath); err != nil {
return false, fmt.Errorf("unable to add file to git, %w", err)
}
updatedFiles += 1
}
return updatedFiles > 0, nil
}
func updateRegistryValues(instance *codebaseService.Codebase, gitService *git.Service) (bool, error) {
currentValuesValuesStr, err := gitService.GetFileContents(registry.ValuesLocation)
if err != nil {
return false, fmt.Errorf("unable to get values from repo, %w", err)
}
var currentValues map[string]interface{}
if err := yaml.Unmarshal([]byte(currentValuesValuesStr), ¤tValues); err != nil {
return false, fmt.Errorf("unable to decode values, %w", err)
}
if currentValues == nil {
currentValues = make(map[string]interface{})
}
var instanceValues map[string]interface{}
if err := json.Unmarshal([]byte(instance.Annotations[registry.AnnotationValues]), &instanceValues); err != nil {
return false, fmt.Errorf("unable to decode codebase values, %w", err)
}
mergedValues := MergeMaps(currentValues, instanceValues)
bts, err := yaml.Marshal(mergedValues)
if err != nil {
return false, fmt.Errorf("unable to encode values yaml, %w", err)
}
newContents := string(bts)
if newContents != currentValuesValuesStr {
if err := gitService.SetFileContents(registry.ValuesLocation, newContents); err != nil {
return false, fmt.Errorf("unable to set values yaml file contents, %w", err)
}
if err := gitService.Add(registry.ValuesLocation); err != nil {
return false, fmt.Errorf("unable to add values file to git, %w", err)
}
return true, nil
}
return false, nil
}
func MergeMaps(a, b map[string]interface{}) map[string]interface{} {
out := make(map[string]interface{}, len(a))
for k, v := range a {
out[k] = v
}
for k, v := range b {
if v, ok := v.(map[string]interface{}); ok {
if bv, ok := out[k]; ok {
if bv, ok := bv.(map[string]interface{}); ok {
out[k] = MergeMaps(bv, v)
continue
}
}
}
out[k] = v
}
return out
}
func (c *Controller) getGerritProject(ctx context.Context, name string) (*gerritService.GerritProject, error) {
var projectList gerritService.GerritProjectList
if err := c.k8sClient.List(ctx, &projectList); err != nil {
return nil, fmt.Errorf("unable to list gerrit projects, %w", err)
}
for _, prj := range projectList.Items {
if prj.Spec.Name == name {
return &prj, nil
}
}
return nil, service.ErrNotFound("unable to find gerrit project")
}