docker/commons/security.py (60 lines of code) (raw):
from abc import ABC, abstractmethod
from commons.constants import KID_ATTR, ALG_ATTR, TYP_ATTR
from services.clients.abstract_key_management import \
AbstractKeyManagementClient
from base64 import urlsafe_b64encode
from json import dumps
from datetime import datetime
EXPIRATION_ATTR = 'exp'
ENCODING = 'utf-8'
class AbstractTokenEncoder(ABC):
# Token: Header fields
kid: str
typ: str
alg: str
# Token: Production fields
key_management: AbstractKeyManagementClient
# Key Management: reference fields
prk_id: str
def __init__(self):
self._reset()
@abstractmethod
def __setitem__(self, key, value):
raise NotImplemented
@abstractmethod
def expire(self, dnt: datetime):
raise NotImplemented
@abstractmethod
def _reset(self):
raise NotImplemented
@property
@abstractmethod
def product(self):
message = 'Could not produce a Token, due to improper {} attribute.'
for key, required_type in self.__annotations__.items():
obj = getattr(self, key, None)
assert isinstance(obj, required_type), message.format(key)
class TokenEncoder(AbstractTokenEncoder):
def __setitem__(self, key, value):
self._payload[key] = value
def expire(self, dnt: datetime):
self._payload[EXPIRATION_ATTR] = dnt.timestamp()
def _reset(self):
self._payload = {}
@property
def product(self):
_ = super().product
# Allows to inject header-data, independently.
header = {
TYP_ATTR: self.typ, ALG_ATTR: self.alg, KID_ATTR: self.kid
}
payload = self._payload
message = b'.'.join(
self._encode(dumps(each, separators=(",", ":")).encode(ENCODING))
for each in (header, payload)
)
signature = self.key_management.sign(
key_id=self.prk_id, message=message, algorithm=self.alg
)
token = message + b'.' + self._encode(signature)
self._reset()
return token.decode(ENCODING)
@staticmethod
def _encode(data: bytes):
return urlsafe_b64encode(data).replace(b"=", b"")