pkg/service/platform/k8s/k8s.go (425 lines of code) (raw):
package k8s
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"regexp"
"strconv"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
coreV1Api "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8sScheme "k8s.io/client-go/kubernetes/scheme"
appsV1Client "k8s.io/client-go/kubernetes/typed/apps/v1"
coreV1Client "k8s.io/client-go/kubernetes/typed/core/v1"
networkingClient "k8s.io/client-go/kubernetes/typed/networking/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
ctrl "sigs.k8s.io/controller-runtime"
k8sClient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
keycloakApi "github.com/epam/edp-keycloak-operator/api/v1"
gerritApi "github.com/epam/edp-gerrit-operator/v2/api/v1"
"github.com/epam/edp-gerrit-operator/v2/pkg/service/gerrit/spec"
platformHelper "github.com/epam/edp-gerrit-operator/v2/pkg/service/platform/helper"
)
const (
nameKey = "name"
Base = 10
BitSize = 32
)
var log = ctrl.Log.WithName("platform")
// K8SService implements platform.Service interface (k8s platform integration).
type K8SService struct {
Scheme *runtime.Scheme
CoreClient coreV1Client.CoreV1Interface
appsV1Client *appsV1Client.AppsV1Client
networkingClient networkingClient.NetworkingV1Interface
client k8sClient.Client
}
// GetExternalEndpoint returns host and scheme of an Ingress.
func (s *K8SService) GetExternalEndpoint(namespace, name string) (string, string, error) {
ctx := context.Background()
ingress, err := s.networkingClient.Ingresses(namespace).Get(ctx, name, metaV1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
return "", "", fmt.Errorf("ingress %v in namespace %v not found", name, namespace)
}
return "", "", fmt.Errorf("failed to Get Ingress %q: %w", name, err)
}
host := ingress.Spec.Rules[0].Host
scheme := platformHelper.RouteHTTPSScheme
return host, scheme, nil
}
func (s *K8SService) IsDeploymentReady(gerrit *gerritApi.Gerrit) (bool, error) {
ctx := context.Background()
deployment, err := s.appsV1Client.Deployments(gerrit.Namespace).Get(ctx, gerrit.Name, metaV1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to Get Deployment %q: %w", gerrit.Name, err)
}
if deployment.Status.UpdatedReplicas == 1 && deployment.Status.AvailableReplicas == 1 {
return true, nil
}
return false, nil
}
func (s *K8SService) PatchDeploymentEnv(gerrit *gerritApi.Gerrit, env []coreV1Api.EnvVar) error {
ctx := context.Background()
d, err := s.appsV1Client.Deployments(gerrit.Namespace).Get(ctx, gerrit.Name, metaV1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to Get Deployment %q: %w", gerrit.Name, err)
}
if len(env) == 0 {
return nil
}
container, err := platformHelper.SelectContainer(d.Spec.Template.Spec.Containers, gerrit.Name)
if err != nil {
return fmt.Errorf("not containers found for gerrit resource %q: %w", gerrit.Name, err)
}
container.Env = platformHelper.UpdateEnv(container.Env, env)
d.Spec.Template.Spec.Containers = append(d.Spec.Template.Spec.Containers, container)
jsonDc, err := json.Marshal(d)
if err != nil {
return fmt.Errorf("failed encode to json gerrit deployment k8s object %q: %w", gerrit.Name, err)
}
_, err = s.appsV1Client.Deployments(d.Namespace).Patch(ctx, d.Name, types.StrategicMergePatchType, jsonDc, metaV1.PatchOptions{})
if err != nil {
return fmt.Errorf("failed to Patch Deployment %q: %w", d.Name, err)
}
return nil
}
// Init process with K8SService instance initialization actions.
func (s *K8SService) Init(config *rest.Config, scheme *runtime.Scheme) error {
s.Scheme = scheme
coreClient, err := coreV1Client.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create k8s coreV1Client: %w", err)
}
s.CoreClient = coreClient
appsClient, err := appsV1Client.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create k8s AppsV1Client: %w", err)
}
s.appsV1Client = appsClient
netClient, err := networkingClient.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "Failed to init extensions V1 client for K8S")
}
s.networkingClient = netClient
cl, err := k8sClient.New(config, k8sClient.Options{
Scheme: s.Scheme,
})
if err != nil {
return errors.Wrap(err, "Failed to create k8s client")
}
s.client = cl
return nil
}
func (s *K8SService) GetDeploymentSSHPort(instance *gerritApi.Gerrit) (int32, error) {
ctx := context.Background()
d, err := s.appsV1Client.Deployments(instance.Namespace).Get(ctx, instance.Name, metaV1.GetOptions{})
if err != nil {
return 0, fmt.Errorf("failed to GET Deployment %q: %w", instance.Name, err)
}
for _, env := range d.Spec.Template.Spec.Containers[0].Env {
if env.Name == spec.SSHListnerEnvName {
re := regexp.MustCompile(`\d+`)
if re.MatchString(env.Value) {
ports := re.FindStringSubmatch(env.Value)
if len(ports) != 1 {
return 0, nil
}
port := ports[0]
portNumber, err := strconv.ParseInt(port, Base, BitSize)
if err != nil {
return 0, fmt.Errorf("failed to parse port value %q: %w", port, err)
}
return int32(portNumber), nil
}
}
}
return 0, nil
}
// GenerateKeycloakSettings generates a set of environment var.
func (s *K8SService) GenerateKeycloakSettings(instance *gerritApi.Gerrit) ([]coreV1Api.EnvVar, error) {
identityServiceSecretName := fmt.Sprintf("%v-%v", instance.Name, spec.IdentityServiceCredentialsSecretPostfix)
realm, err := s.getKeycloakRealm(instance)
if err != nil {
return nil, err
}
keycloakUrl, err := s.getKeycloakRootUrl(instance)
if err != nil {
return nil, err
}
return []coreV1Api.EnvVar{
{
Name: "AUTH_TYPE",
Value: "OAUTH",
},
{
Name: "OAUTH_KEYCLOAK_CLIENT_ID",
Value: instance.Name,
},
{
Name: "OAUTH_KEYCLOAK_REALM",
Value: realm.Spec.RealmName,
},
{
Name: "OAUTH_KEYCLOAK_ROOT_URL",
Value: *keycloakUrl,
},
{
Name: "OAUTH_KEYCLOAK_CLIENT_SECRET",
ValueFrom: &coreV1Api.EnvVarSource{
SecretKeyRef: &coreV1Api.SecretKeySelector{
LocalObjectReference: coreV1Api.LocalObjectReference{
Name: identityServiceSecretName,
},
Key: "clientSecret",
},
},
},
}, nil
}
func (s *K8SService) getKeycloakRealm(instance *gerritApi.Gerrit) (*keycloakApi.KeycloakRealm, error) {
ctx := context.Background()
if instance.Spec.KeycloakSpec.Realm != "" {
realmList := keycloakApi.KeycloakRealmList{}
listOpts := k8sClient.ListOptions{Namespace: instance.Namespace}
k8sClient.MatchingLabels(map[string]string{
"targetRealm": instance.Spec.KeycloakSpec.Realm,
}).ApplyToList(&listOpts)
if err := s.client.List(ctx, &realmList, &listOpts); err != nil {
return nil, errors.Wrap(err, "unable to get reams by label")
}
if len(realmList.Items) > 0 {
return &realmList.Items[0], nil
}
if err := s.client.List(ctx, &realmList,
&k8sClient.ListOptions{Namespace: instance.Namespace}); err != nil {
return nil, errors.Wrap(err, "unable to get all reams")
}
for i := 0; i < len(realmList.Items); i++ {
if realmList.Items[i].Spec.RealmName == instance.Spec.KeycloakSpec.Realm {
return &realmList.Items[i], nil
}
}
}
name := "main"
realm := &keycloakApi.KeycloakRealm{}
err := s.client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: instance.Namespace,
}, realm)
if err != nil {
return nil, fmt.Errorf("failed to GET KeycloakRealm resorce %q: %w", name, err)
}
return realm, nil
}
func (s *K8SService) getKeycloakRootUrl(instance *gerritApi.Gerrit) (*string, error) {
ctx := context.Background()
realm, err := s.getKeycloakRealm(instance)
if err != nil {
return nil, err
}
if len(realm.OwnerReferences) == 0 {
return nil, errors.Errorf("realm [%s] does not have owner refs", realm.Name)
}
keycloak := &keycloakApi.Keycloak{}
name := realm.OwnerReferences[0].Name //TODO: check if owner references is not empty before access
err = s.client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: instance.Namespace,
}, keycloak)
if err != nil {
return nil, fmt.Errorf("failed to GET Keycloak resorce %q: %w", name, err)
}
keycloakUrl := keycloak.Spec.Url
return &keycloakUrl, nil
}
// GetSecret return data field of Secret.
func (s *K8SService) GetSecretData(namespace, name string) (map[string][]byte, error) {
log.Info("getting secret data", nameKey, name)
ctx := context.Background()
secret, err := s.CoreClient.Secrets(namespace).Get(ctx, name, metaV1.GetOptions{})
if err != nil && k8sErrors.IsNotFound(err) {
log.Info(fmt.Sprintf("Secret %v in namespace %v not found", name, namespace))
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("failed to GET secret resorce %q: %w", name, err)
}
return secret.Data, nil
}
// GetPods returns Pod list according to the filter.
func (s *K8SService) GetPods(namespace string, filter *metaV1.ListOptions) (*coreV1Api.PodList, error) {
ctx := context.Background()
podList, err := s.CoreClient.Pods(namespace).List(ctx, *filter)
if err != nil {
return &coreV1Api.PodList{}, fmt.Errorf("failed to GET list of Pods: %w", err)
}
return podList, nil
}
// ExecInPod executes command in pod.
func (s *K8SService) ExecInPod(namespace, podName string, command []string) (stdout, stderr io.Reader, err error) {
ctx := context.Background()
pod, err := s.CoreClient.Pods(namespace).Get(ctx, podName, metaV1.GetOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to GET Pod %q: %w", podName, err)
}
req := s.CoreClient.RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&coreV1Api.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: command,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, k8sScheme.ParameterCodec)
restConfig, err := newRestConfig()
if err != nil {
return nil, nil, err
}
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
if err != nil {
return nil, nil, fmt.Errorf("failed to connect to the %q server: %w", req.URL(), err)
}
stdoutBuffer, stderrBuffer := new(bytes.Buffer), new(bytes.Buffer)
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdoutBuffer,
Stderr: stderrBuffer,
Tty: false,
})
if err != nil {
return nil, stderrBuffer, fmt.Errorf("failed to execute shell-style stream: %w", err)
}
stdout = stdoutBuffer
stderr = stderrBuffer
return
}
// CreateSecret creates a new Secret Resource for a Gerrit EDP Component.
func (s *K8SService) CreateSecret(gerrit *gerritApi.Gerrit, secretName string, data map[string][]byte, labels map[string]string) error {
ctx := context.Background()
vLog := log.WithValues(nameKey, secretName)
vLog.Info("creating secret")
_, err := s.CoreClient.Secrets(gerrit.Namespace).Get(ctx, secretName, metaV1.GetOptions{})
if err == nil {
return nil
}
if !k8sErrors.IsNotFound(err) {
return fmt.Errorf("failed to GET secret resorce %q: %w", secretName, err)
}
log.Info("Creating a new Secret for Gerrit", nameKey, secretName)
gerritSecretObject := newGerritSecret(secretName, gerrit.Name, gerrit.Namespace, data)
maps.Copy(gerritSecretObject.Labels, labels)
err = controllerutil.SetControllerReference(gerrit, gerritSecretObject, s.Scheme)
if err != nil {
return fmt.Errorf("failed to set owner %q (Gerrit resource) as a controller OwnerReference on %q (Gerrit Secret resource): %w", gerrit.Name, gerritSecretObject.Name, err)
}
_, err = s.CoreClient.Secrets(gerritSecretObject.Namespace).Create(ctx, gerritSecretObject, metaV1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create gerrit Secret resource %q: %w", gerritSecretObject.Name, err)
}
log.Info("Secret has been created", nameKey, gerritSecretObject.Name)
return nil
}
// GetSecret returns data section of an existing Secret resource of a Gerrit EDP Component.
func (s *K8SService) GetSecret(namespace, name string) (map[string][]byte, error) {
ctx := context.Background()
secret, err := s.CoreClient.Secrets(namespace).Get(ctx, name, metaV1.GetOptions{})
if err == nil {
return secret.Data, nil
}
if k8sErrors.IsNotFound(err) {
log.Info(fmt.Sprintf("Secret %v in namespace %v not found", name, namespace))
return nil, nil
}
// got unexpected error
return nil, fmt.Errorf("failed to GET %q secret: %w", name, err)
}
// GetService returns existing Service resource of a Gerrit EDP Component.
func (s *K8SService) GetService(namespace, name string) (*coreV1Api.Service, error) {
ctx := context.Background()
service, err := s.CoreClient.Services(namespace).Get(ctx, name, metaV1.GetOptions{})
if err == nil {
return service, nil
}
if k8sErrors.IsNotFound(err) {
log.Info("Service %v in namespace %v not found", name, namespace)
return nil, nil
}
// got unexpected error
return nil, fmt.Errorf("failed to GET %q service: %w", name, err)
}
// UpdateService updates target port of a Gerrit EDP Component.
func (s *K8SService) UpdateService(svc *coreV1Api.Service, nodePort int32) error {
ctx := context.Background()
ports := svc.Spec.Ports
updatedPorts, err := updatePort(ports, "ssh", nodePort)
if err != nil {
return err
}
svc.Spec.Ports = updatedPorts
_, err = s.CoreClient.Services(svc.Namespace).Update(ctx, svc, metaV1.UpdateOptions{})
if err != nil {
return fmt.Errorf("faile to update %q service: %w", svc.Name, err)
}
return nil
}
func newGerritSecret(name, gerritName, namespace string, data map[string][]byte) *coreV1Api.Secret {
return &coreV1Api.Secret{
ObjectMeta: metaV1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: platformHelper.GenerateLabels(gerritName),
},
Data: data,
Type: "Opaque",
}
}
func newRestConfig() (*rest.Config, error) {
config := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)
restConfig, err := config.ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to retrive config: %w", err)
}
return restConfig, nil
}
func updatePort(ports []coreV1Api.ServicePort, name string, nodePort int32) ([]coreV1Api.ServicePort, error) {
for i, p := range ports {
if p.Name == name {
p.Port = nodePort
p.TargetPort.IntVal = nodePort
}
ports[i] = p
}
return ports, nil
}
func (s *K8SService) CreateConfigMap(instance *gerritApi.Gerrit, configMapName string, configMapData map[string]string) error {
ctx := context.Background()
labels := platformHelper.GenerateLabels(instance.Name)
configMapObject := &coreV1Api.ConfigMap{
ObjectMeta: metaV1.ObjectMeta{
Name: configMapName,
Namespace: instance.Namespace,
Labels: labels,
},
Data: configMapData,
}
if err := controllerutil.SetControllerReference(instance, configMapObject, s.Scheme); err != nil {
return errors.Wrapf(err, "Couldn't set reference for Config Map %v object", configMapObject.Name)
}
_, err := s.CoreClient.ConfigMaps(instance.Namespace).Get(ctx, configMapObject.Name, metaV1.GetOptions{})
if err == nil {
return nil
}
if !k8sErrors.IsNotFound(err) {
return errors.Wrapf(err, "Couldn't get ConfigMap %v object", configMapObject.Name)
}
cm, err := s.CoreClient.ConfigMaps(configMapObject.Namespace).Create(ctx, configMapObject, metaV1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "Couldn't create Config Map %v object", cm.Name)
}
log.Info(fmt.Sprintf("ConfigMap %s/%s has been created", cm.Namespace, cm.Name))
return nil
}