pkg/rbac/rbac.go (123 lines of code) (raw):
package rbac
import (
"context"
"fmt"
"github.com/go-logr/logr"
rbacApi "k8s.io/api/rbac/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
crNameLogKey = "name"
ClusterRoleKind = "ClusterRole"
RoleKind = "Role"
)
type Manager interface {
GetRoleBinding(name, namespace string) (*rbacApi.RoleBinding, error)
RoleBindingExists(ctx context.Context, name, namespace string) (bool, error)
CreateRoleBinding(name, namespace string, subjects []rbacApi.Subject, roleRef rbacApi.RoleRef) error
CreateRoleBindingIfNotExists(ctx context.Context, name, namespace string, subjects []rbacApi.Subject, roleRef rbacApi.RoleRef) error
GetRole(name, namespace string) (*rbacApi.Role, error)
CreateRole(name, namespace string, rules []rbacApi.PolicyRule) error
}
type KubernetesRbac struct {
client client.Client
log logr.Logger
}
func NewRbacManager(c client.Client, log logr.Logger) Manager {
return KubernetesRbac{
client: c,
log: log,
}
}
func (s KubernetesRbac) GetRoleBinding(name, namespace string) (*rbacApi.RoleBinding, error) {
log := s.log.WithValues(crNameLogKey, name, "namespace", namespace)
log.Info("getting role binding")
rb := &rbacApi.RoleBinding{}
if err := s.client.Get(context.Background(), types.NamespacedName{
Namespace: namespace,
Name: name,
}, rb); err != nil {
return nil, fmt.Errorf("failed to get role binding: %w", err)
}
return rb, nil
}
// RoleBindingExists checks if a RoleBinding exists in the given namespace.
func (s KubernetesRbac) RoleBindingExists(ctx context.Context, name, namespace string) (bool, error) {
log := s.log.WithValues(crNameLogKey, name)
log.Info("Checking if RoleBinding exists")
if err := s.client.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, &rbacApi.RoleBinding{}); err != nil {
if k8sErrors.IsNotFound(err) {
log.Info("RoleBinding does not exist")
return false, nil
}
return false, fmt.Errorf("failed to get role binding: %w", err)
}
log.Info("RoleBinding exists")
return true, nil
}
func (s KubernetesRbac) CreateRoleBinding(name, namespace string, subjects []rbacApi.Subject, roleRef rbacApi.RoleRef) error {
log := s.log.WithValues(crNameLogKey, name)
log.Info("creating rolebinding")
rb := &rbacApi.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Subjects: subjects,
RoleRef: roleRef,
}
if err := s.client.Create(context.Background(), rb); err != nil {
return fmt.Errorf("failed to create role binding: %w", err)
}
log.Info("rolebinding has been created")
return nil
}
// CreateRoleBindingIfNotExists creates a RoleBinding if it does not exist in the given namespace.
func (s KubernetesRbac) CreateRoleBindingIfNotExists(
ctx context.Context,
name,
namespace string,
subjects []rbacApi.Subject,
roleRef rbacApi.RoleRef,
) error {
exists, err := s.RoleBindingExists(ctx, name, namespace)
if err != nil {
return fmt.Errorf("failed to check if role binding exists: %w", err)
}
if exists {
return nil
}
return s.CreateRoleBinding(name, namespace, subjects, roleRef)
}
func (s KubernetesRbac) GetRole(name, namespace string) (*rbacApi.Role, error) {
log := s.log.WithValues(crNameLogKey, name, "namespace", namespace)
log.Info("getting role binding")
r := &rbacApi.Role{}
if err := s.client.Get(context.Background(), types.NamespacedName{
Namespace: namespace,
Name: name,
}, r); err != nil {
return nil, fmt.Errorf("failed to get role: %w", err)
}
return r, nil
}
func (s KubernetesRbac) CreateRole(name, namespace string, rules []rbacApi.PolicyRule) error {
log := s.log.WithValues(crNameLogKey, name)
log.Info("creating role")
r := &rbacApi.Role{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Rules: rules,
}
if err := s.client.Create(context.Background(), r); err != nil {
return fmt.Errorf("failed to create role: %w", err)
}
log.Info("role has been created")
return nil
}