controller/merge_request/controller.go (527 lines of code) (raw):
package merge_request
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path"
"reflect"
"strings"
"time"
"github.com/patrickmn/go-cache"
"gopkg.in/yaml.v3"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
k8sController "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"ddm-admin-console/app/registry"
"ddm-admin-console/config"
"ddm-admin-console/controller"
"ddm-admin-console/controller/codebase"
codebaseSvc "ddm-admin-console/service/codebase"
gerritService "ddm-admin-console/service/gerrit"
"ddm-admin-console/service/git"
"ddm-admin-console/service/gitserver"
"ddm-admin-console/service/jenkins"
)
type Controller struct {
logger controller.Logger
mgr ctrl.Manager
k8sClient client.Client
cnf *config.Settings
gerrit gerritService.ServiceInterface
appCache *cache.Cache
codebaseService codebaseSvc.ServiceInterface
gitServerService gitserver.ServiceInterface
jenkinsService jenkins.ServiceInterface
versionFilter *registry.VersionFilter
}
func Make(
mgr ctrl.Manager,
logger controller.Logger,
cnf *config.Settings,
gerrit gerritService.ServiceInterface,
cbService codebaseSvc.ServiceInterface,
gitServerService gitserver.ServiceInterface,
jenkinsService jenkins.ServiceInterface,
appCache *cache.Cache,
) error {
c := Controller{
mgr: mgr,
logger: logger,
k8sClient: mgr.GetClient(),
cnf: cnf,
gerrit: gerrit,
appCache: appCache,
codebaseService: cbService,
gitServerService: gitServerService,
jenkinsService: jenkinsService,
}
vf, err := registry.MakeVersionFilter(cnf.RegistryVersionFilter)
if err != nil {
return fmt.Errorf("unable to init version filter, %w", err)
}
c.versionFilter = vf
if err := ctrl.NewControllerManagedBy(mgr).
For(
&gerritService.GerritMergeRequest{},
builder.WithPredicates(
predicate.Funcs{
UpdateFunc: isSpecUpdated,
},
),
).
WithOptions(
k8sController.Options{
MaxConcurrentReconciles: 1,
},
).
Complete(&c); err != nil {
return fmt.Errorf("unable to create controller, err: %w", err)
}
return nil
}
func isSpecUpdated(e event.UpdateEvent) bool {
oo := e.ObjectOld.(*gerritService.GerritMergeRequest)
no := e.ObjectNew.(*gerritService.GerritMergeRequest)
return !reflect.DeepEqual(oo.Spec, no.Spec) || !reflect.DeepEqual(oo.Status, no.Status) ||
(oo.GetDeletionTimestamp().IsZero() && !no.GetDeletionTimestamp().IsZero())
}
func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
c.logger.Infow("reconciling merge request", "Request.Namespace", request.Namespace,
"Request.Name", request.Name)
if err := c.reconcile(ctx, request); err != nil {
c.logger.Errorw(err.Error(), "Request.Namespace", request.Namespace, "Request.Name", request.Name)
if codebase.IsErrPostpone(err) {
return reconcile.Result{RequeueAfter: errors.Unwrap(err).(codebase.ErrPostpone).D()}, nil
}
return reconcile.Result{RequeueAfter: codebase.DefaultRetryTimeout}, nil
}
c.logger.Infow("reconciling merge request done", "Request.Namespace", request.Namespace,
"Request.Name", request.Name)
return reconcile.Result{}, nil
}
func (c *Controller) reconcile(ctx context.Context, request reconcile.Request) error {
var instance gerritService.GerritMergeRequest
if err := c.k8sClient.Get(ctx, request.NamespacedName, &instance); err != nil {
if k8sErrors.IsNotFound(err) {
c.logger.Infow("instance not found", "Request.Namespace", request.Namespace, "Request.Name", request.Name)
return nil
}
return fmt.Errorf("unable to get merge request from k8s, err: %w", err)
}
cb, err := c.codebaseService.Get(instance.Spec.ProjectName)
if err != nil {
return fmt.Errorf("unable to get project codebase, %w", err)
}
processRequest, err := codebase.ProcessRegistryVersion(ctx, c.versionFilter, cb, c.gerrit)
if err != nil {
return fmt.Errorf("unable to p, err: %w", err)
}
if !processRequest {
c.logger.Infow("reconciling merge request skipped, wrong registry version",
"Request.Namespace", instance.Namespace, "Request.Name", instance.Name)
return nil
}
if err := c.prepareMergeRequest(ctx, &instance); err != nil {
return fmt.Errorf("unable to prepare merge request, err: %w", err)
}
if err := c.autoApproveMergeRequest(ctx, &instance); err != nil {
return fmt.Errorf("unable to approve MR, err: %w", err)
}
if err := c.triggerJobProvisioner(ctx, &instance, cb); err != nil {
return fmt.Errorf("unable to triggerJobProvisioner MR, err: %w", err)
}
if err := c.addCachedFiles(ctx, &instance); err != nil {
return fmt.Errorf("unable to proceed cached files, %w", err)
}
if err := c.checkBuildJobStatus(ctx, &instance, cb); err != nil {
return fmt.Errorf("unable to check build status, %w", err)
}
return nil
}
func (c *Controller) checkBuildJobStatus(ctx context.Context, instance *gerritService.GerritMergeRequest, cb *codebaseSvc.Codebase) error {
if instance.Status.Value != gerritService.StatusNew {
return nil
}
branches, err := c.codebaseService.GetBranchesByCodebase(ctx, cb.Name)
if err != nil {
return fmt.Errorf("unable to get branches, %w", err)
}
for _, b := range branches {
branchName := strings.ToUpper(b.Spec.BranchName)
status, _, err := c.jenkinsService.GetJobStatus(ctx, fmt.Sprintf("%s/view/%s/job/%s-Build-%s", b.Spec.CodebaseName,
branchName, branchName, b.Spec.CodebaseName))
if err != nil {
return fmt.Errorf("unabel to load job status, %w", err)
}
if status != jenkins.StatusSuccess {
cb.Annotations[codebaseSvc.StatusAnnotation] = codebaseSvc.StatusAnnotationRunningJobs
if err := c.codebaseService.Update(ctx, cb); err != nil {
return fmt.Errorf("unable to update instance, %w", err)
}
return codebase.ErrPostpone(codebase.DefaultRetryTimeout)
}
}
if annotationStatus, ok := cb.Annotations[codebaseSvc.StatusAnnotation]; ok &&
annotationStatus == codebaseSvc.StatusAnnotationRunningJobs {
delete(cb.Annotations, codebaseSvc.StatusAnnotation)
if err := c.codebaseService.Update(ctx, cb); err != nil {
return fmt.Errorf("unable to update instance, %w", err)
}
}
return nil
}
func (c *Controller) triggerJobProvisioner(
ctx context.Context,
instance *gerritService.GerritMergeRequest,
cb *codebaseSvc.Codebase,
) error {
if instance.Status.Value != gerritService.StatusMerged {
return nil
}
js, ok := instance.Annotations[registry.MRAnnotationActions]
if !ok {
return nil
}
var actions []string
if err := json.Unmarshal([]byte(js), &actions); err != nil {
return fmt.Errorf("unable to unmarshal actions")
}
backupSchedule := false
for _, a := range actions {
if a == registry.MRActionBackupSchedule {
backupSchedule = true
}
}
if !backupSchedule {
return nil
}
if cb.Spec.JobProvisioning == nil {
return fmt.Errorf("project has no job provisioning")
}
gitServer, err := c.gitServerService.Get(cb.Spec.GitServer)
if err != nil {
return fmt.Errorf("failed to get gitService CR: %w", err)
}
if err := c.jenkinsService.CreateJobBuildRun(
ctx,
fmt.Sprintf("backup-schedule-%d", time.Now().Unix()),
fmt.Sprintf("/job-provisions/job/ci/job/%s", *cb.Spec.JobProvisioning),
map[string]string{
"NAME": cb.Name,
"TYPE": cb.Spec.Type,
"BUILD_TOOL": cb.Spec.BuildTool,
"BRANCH": cb.Spec.DefaultBranch,
"GIT_SERVER_CR_NAME": cb.Spec.GitServer,
"GIT_SERVER_CR_VERSION": "v2",
"GIT_CREDENTIALS_ID": "gerrit-ciuser-sshkey",
"REPOSITORY_PATH": fmt.Sprintf("ssh://%s@gerrit:31000/%s", gitServer.Spec.GitUser, cb.Name),
"JIRA_INTEGRATION_ENABLED": "false",
},
); err != nil {
return fmt.Errorf("unable to create job build run")
}
var replaceActions []string
for _, a := range actions {
if a != registry.MRActionBackupSchedule {
replaceActions = append(replaceActions, a)
}
}
bts, err := json.Marshal(replaceActions)
if err != nil {
return fmt.Errorf("unable to marshal json, %w", err)
}
instance.Annotations[registry.MRAnnotationActions] = string(bts)
if err := c.k8sClient.Update(ctx, instance); err != nil {
return fmt.Errorf("unable to update MR instance, %w", err)
}
return nil
}
func (c *Controller) addCachedFiles(ctx context.Context, instance *gerritService.GerritMergeRequest) error {
if instance.Status.Value != gerritService.StatusNew {
return nil
}
key := registry.CachedFilesIndex(instance.Spec.ProjectName)
files, ok := c.appCache.Get(key)
_, ok = files.([]registry.CachedFile)
if !ok {
c.appCache.Delete(key)
c.logger.Infow("wrong cached files type", "Request.Namespace", instance.Namespace,
"Request.Name", instance.Name)
return nil
}
_, _, projectPath, err := prepareControllerFolders(c.cnf.TempFolder, instance.Spec.ProjectName)
if err != nil {
return fmt.Errorf("unable to prepare controller tmp folders, %w", err)
}
gitService, err := c.initGitService(ctx, projectPath)
if err != nil {
return fmt.Errorf("unable to init git service, %w", err)
}
if err := gitService.Clone(fmt.Sprintf("%s/%s", codebase.GerritSSHURL(c.cnf), instance.Spec.ProjectName)); err != nil {
return fmt.Errorf("unable to clone repo, %w", err)
}
detail, err := c.gerrit.GetChangeDetails(instance.Status.ChangeID)
if err != nil {
return fmt.Errorf("unable to get change details, %w", err)
}
var (
ref string
commitMessage string
)
for _, v := range detail.Revisions {
ref = v.Ref
commitMessage = v.Commit.Message
}
if ref == "" {
return errors.New("empty ref")
}
if commitMessage == "" {
commitMessage = fmt.Sprintf("edit registry\n\nChange-Id: %s", instance.Status.ChangeID)
}
if err := gitService.RawPull("origin", ref); err != nil {
return fmt.Errorf("unable to pull refs, %w", err)
}
changed, err := codebase.SetCachedFiles(instance.Spec.ProjectName, c.appCache, gitService)
if err != nil {
return fmt.Errorf("unable to set cached files, %w", err)
}
if changed {
if err := gitService.RawCommit(&git.User{Name: instance.Spec.AuthorName, Email: instance.Spec.AuthorEmail},
commitMessage, "--amend"); err != nil {
return fmt.Errorf("unable to commit, %w", err)
}
if err := gitService.Push("origin", fmt.Sprintf("HEAD:refs/for/%s", instance.TargetBranch())); err != nil {
return fmt.Errorf("unable to push refs, %w", err)
}
}
return nil
}
func (c *Controller) autoApproveMergeRequest(ctx context.Context, instance *gerritService.GerritMergeRequest) error {
if instance.Status.ChangeID == "" || instance.Status.Value != gerritService.StatusNew {
return nil
}
label, ok := instance.Labels[registry.MRLabelApprove]
if !ok || label != registry.MRLabelApproveAuto {
return nil
}
if err := c.gerrit.ApproveAndSubmitChange(instance.Status.ChangeID, instance.Spec.AuthorName,
instance.Spec.AuthorEmail); err != nil {
return fmt.Errorf("unable to approve and submit change, err: %w", err)
}
instance.Status.Value = gerritService.StatusMerged
if err := c.k8sClient.Status().Update(ctx, instance); err != nil {
return fmt.Errorf("unable to updat MR status, err: %w", err)
}
return nil
}
// actions
// 1. clone repo
// 2. checkout target branch
// 3. backup values.yaml if it exists
// 4. checkout source branch
// 5. backup source branch
// 5.1 checkout to target branch
// 6. delete source branch
// 6.1 checkout -b source branch from target branch
// 7. restore source branch
// 8. manually merge values.yaml from backup with new version from source branch if it backup`ed
// 9. create new change to source branch with new commit
// 10. apply and submit change
// 11. set merge request cr spec source branch to pass it to gerrit operator
// TODO: move this logic to registry upgrade app, to remove MR duplication
func (c *Controller) prepareMergeRequest(ctx context.Context, instance *gerritService.GerritMergeRequest) error {
if instance.Labels[registry.MRLabelAction] != registry.MRLabelActionBranchMerge ||
instance.Spec.SourceBranch != "" || instance.Status.ChangeID != "" {
c.logger.Infow("nothing need to be done", "Request.Namespace", instance.Namespace,
"Request.Name", instance.Name)
return nil
}
_, backupFolderPath, projectPath, err := prepareControllerFolders(c.cnf.TempFolder, instance.Spec.ProjectName)
if err != nil {
return fmt.Errorf("unable to prepare controller tmp folders, err: %w", err)
}
gitService, err := c.initGitService(ctx, projectPath)
if err != nil {
return fmt.Errorf("unable to init git service, err: %w", err)
}
targetBranch, sourceBranch, err := getBranchesFromLabels(instance.Labels)
if err != nil {
return fmt.Errorf("unable to get branches from instance labels, err: %w", err)
}
if err := gitService.Clone(fmt.Sprintf("%s/%s", codebase.GerritSSHURL(c.cnf), instance.Spec.ProjectName)); err != nil {
return fmt.Errorf("unable to clone repo, err: %w", err)
}
if err := gitService.RawCheckout(targetBranch, false); err != nil {
return fmt.Errorf("unable to checkout branch, err: %w", err)
}
//backup values.yaml
valuesBackupPath := path.Join(backupFolderPath, "backup-values.yaml")
projectValuesPath := path.Join(projectPath, registry.ValuesLocation)
if err := CopyFile(projectValuesPath, valuesBackupPath); err != nil {
return fmt.Errorf("unable to backup values yaml, err: %w", err)
}
if err := gitService.RawCheckout(sourceBranch, false); err != nil {
return fmt.Errorf("unable to checkout, err: %w", err)
}
//backup source branch
projectBackupPath, err := backupProject(backupFolderPath, projectPath, instance.Spec.ProjectName)
if err != nil {
return fmt.Errorf("unable to backup source branch, err: %w", err)
}
//checkout to target branch
if err := gitService.RawCheckout(targetBranch, false); err != nil {
return fmt.Errorf("unable to checkout, err: %w", err)
}
//delete source branch
if err := gitService.DeleteBranch(sourceBranch); err != nil {
return fmt.Errorf("unable to delete source branch, err: %w", err)
}
//recreate source branch
if err := gitService.RawCheckout(sourceBranch, true); err != nil {
return fmt.Errorf("unable to checkout to source branch, err: %w", err)
}
//restore source branch
if err := CopyFolder(fmt.Sprintf("%s/.", projectBackupPath), fmt.Sprintf("%s/", projectPath)); err != nil {
return fmt.Errorf("unable to restore source branch, err: %w", err)
}
//merge values from target branch
if err := MergeValuesFiles(valuesBackupPath, projectValuesPath); err != nil {
return fmt.Errorf("unable to merge values, err: %w", err)
}
//add all changes
if err := gitService.Add("."); err != nil {
return fmt.Errorf("unable to add all files, err: %w", err)
}
changeID, err := gitService.GenerateChangeID()
if err != nil {
return fmt.Errorf("unable to generate change id, err: %w", err)
}
if err := gitService.RawCommit(&git.User{Name: instance.Spec.AuthorName, Email: instance.Spec.AuthorEmail},
git.CommitMessageWithChangeID(
fmt.Sprintf("Add new branch %s\n\nupdate branch values.yaml from [%s] branch", sourceBranch,
targetBranch),
changeID)); err != nil && !strings.Contains(err.Error(), "nothing to commit") {
return fmt.Errorf("unable to commit changes, %w", err)
}
if err := gitService.Push("origin", fmt.Sprintf("refs/heads/%s:%s", sourceBranch, sourceBranch), "--force"); err != nil {
return fmt.Errorf("unable to push repo, err: %w", err)
}
var reloadInstance gerritService.GerritMergeRequest
if err := c.k8sClient.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, &reloadInstance); err != nil {
return fmt.Errorf("unable to reload instance, err: %w", err)
}
reloadInstance.Spec.SourceBranch = sourceBranch
reloadInstance.Name = fmt.Sprintf("%s-update-%d", instance.Spec.ProjectName, time.Now().Unix())
reloadInstance.ResourceVersion = ""
if err := c.k8sClient.Create(ctx, &reloadInstance); err != nil {
return fmt.Errorf("unable to create duplicate instance, %w", err)
}
if err := c.k8sClient.Delete(ctx, instance); err != nil {
return fmt.Errorf("unable to delete old mr CR, %w", err)
}
return nil
}
func backupProject(backupFolderPath, projectPath, projectName string) (string, error) {
projectBackupPath := path.Join(backupFolderPath, fmt.Sprintf("backup-%s", projectName))
if err := CopyFolder(projectPath, projectBackupPath); err != nil {
return "", fmt.Errorf("unable to backup source branch, err: %w", err)
}
if err := os.RemoveAll(path.Join(projectBackupPath, ".git")); err != nil {
return "", fmt.Errorf("unable to remove .git folder from backup, err: %w", err)
}
return projectBackupPath, nil
}
func (c *Controller) initGitService(ctx context.Context, projectPath string) (*git.Service, error) {
privateKey, err := codebase.GetGerritPrivateKey(ctx, c.k8sClient, c.cnf)
if err != nil {
return nil, fmt.Errorf("unable to get gerrit private key, err: %w", err)
}
return git.Make(projectPath, c.cnf.GitUsername, privateKey), nil
}
func getBranchesFromLabels(labels map[string]string) (targetBranch, sourceBranch string, err error) {
targetBranch, ok := labels[registry.MRLabelTargetBranch]
if !ok {
err = errors.New("target branch is not specified")
return
}
sourceBranch, ok = labels[registry.MRLabelSourceBranch]
if !ok {
err = errors.New("source branch is not specified")
return
}
return
}
func prepareControllerFolders(tempFolder, projectName string) (controllerFolderPath, backupFolderPath, projectPath string, retErr error) {
controllerFolderPath, retErr = codebase.PrepareControllerTempFolder(tempFolder, "merge-requests")
if retErr != nil {
return
}
backupFolderPath, retErr = codebase.PrepareControllerTempFolder(tempFolder, "mr-backup")
if retErr != nil {
return
}
projectPath = path.Join(controllerFolderPath, projectName)
return
}
func MergeValuesFiles(src, dst string) error {
srcFp, err := os.Open(src)
if err != nil {
return fmt.Errorf("unable to open src file, err: %w", err)
}
dstFp, err := os.Open(dst)
if err != nil {
return fmt.Errorf("unable to open dst file, err: %w", err)
}
var (
srcData map[string]interface{}
dstData map[string]interface{}
)
if err := yaml.NewDecoder(srcFp).Decode(&srcData); err != nil {
return fmt.Errorf("unable to decode src values, err: %w", err)
}
if err := yaml.NewDecoder(dstFp).Decode(&dstData); err != nil {
return fmt.Errorf("unable to decode dst values, err: %w", err)
}
if err := srcFp.Close(); err != nil {
return fmt.Errorf("unable to close src, err: %w", err)
}
if err := dstFp.Close(); err != nil {
return fmt.Errorf("unable to close dst, err: %w", err)
}
out := codebase.MergeMaps(dstData, srcData)
dstFp, err = os.Create(dst)
if err != nil {
return fmt.Errorf("unable to recreate dst, err: %w", err)
}
if err := yaml.NewEncoder(dstFp).Encode(out); err != nil {
return fmt.Errorf("unable to encode dst data, err: %w", err)
}
if err := dstFp.Close(); err != nil {
return fmt.Errorf("unable to close dst, err: %w", err)
}
return nil
}
func CopyFolder(src, dst string) error {
cmd := exec.Command("cp", "-r", src, dst)
var msg string
bts, err := cmd.CombinedOutput()
if len(bts) > 0 {
msg = string(bts)
}
if err != nil {
return fmt.Errorf("unable to copy folder %s, err: %w", msg, err)
}
return nil
}
func CopyFile(src, dst string) error {
if _, err := os.Stat(dst); err == nil {
if err := os.Remove(dst); err != nil {
return fmt.Errorf("unable to remove file, err: %w", err)
}
}
srcFp, err := os.Open(src)
if err != nil {
return fmt.Errorf("unable to open file, err: %w", err)
}
dstFp, err := os.Create(dst)
if err != nil {
return fmt.Errorf("unable to create file, err: %w", err)
}
if _, err := io.Copy(dstFp, srcFp); err != nil {
return fmt.Errorf("unable to copy files, err: %w", err)
}
if err := srcFp.Close(); err != nil {
return fmt.Errorf("unable to close file, err: %w", err)
}
if err := dstFp.Close(); err != nil {
return fmt.Errorf("unable to close file, err: %w", err)
}
return nil
}