docker/services/license_service.py (80 lines of code) (raw):

from mongoengine import DoesNotExist, ValidationError from commons.constants import CUSTOMERS_ATTR, ALGORITHM_ID_ATTR from commons.constants import TENANTS_ATTR, ATTACHMENT_MODEL_ATTR from commons.log_helper import get_logger from commons.time_helper import utc_iso from models.license import License, AllowanceAttribute from models.license import PROHIBITED_ATTACHMENT, PERMITTED_ATTACHMENT from services.setting_service import SettingsService _LOG = get_logger(__name__) class LicenseService: def __init__(self, settings_service: SettingsService): self.settings_service = settings_service @staticmethod def get_license(license_id): try: return License.objects.get(license_key=license_id) except (DoesNotExist, ValidationError): return None @staticmethod def dto(_license: License) -> dict: data = _license.get_json() data.pop(CUSTOMERS_ATTR, None) return data @staticmethod def list(): return list(License.objects) @staticmethod def list_licenses(license_key: str = None): if license_key: license_ = LicenseService.get_license(license_key) return iter([license_, ]) if license_ else [] return LicenseService.list() @staticmethod def get_all_non_expired_licenses(): return list( License.objects.query(expiration__gt=utc_iso()) ) @staticmethod def validate_customers(_license: License, allowed_customers: list): license_customers = list(_license.customers) if not allowed_customers: return license_customers return list(set(license_customers) & set(allowed_customers)) @staticmethod def create(configuration): return License(**configuration) @staticmethod def delete(license_obj: License): return license_obj.delete() def is_applicable_for_customer(self, license_key, customer): license_ = self.get_license(license_id=license_key) if not license_: return False return customer in license_.customers @staticmethod def is_subject_applicable( entity: License, customer: str, tenant: str = None ): """ Predicates whether a subject, such a customer or a tenant within said customer has access to provided license entity. Note: one must verify whether provided tenant belongs to the provided customer, beforehand. :parameter entity: License :parameter customer: str :parameter tenant: Optional[str] :return: bool """ customers = entity.customers.as_dict() scope: dict = customers.get(customer, dict()) model = scope.get(ATTACHMENT_MODEL_ATTR) tenants = scope.get(TENANTS_ATTR, []) retained, _all = tenant in tenants, not tenants attachment = ( model == PERMITTED_ATTACHMENT and (retained or _all), model == PROHIBITED_ATTACHMENT and not (retained or _all) ) return (not tenant) or (tenant and any(attachment)) if scope else False @staticmethod def is_expired(entity: License): return entity.expiration <= utc_iso() @staticmethod def update_license(license_obj: License, license_data: dict): allowance = AllowanceAttribute(**license_data.get('allowance')) license_obj.allowance = allowance license_obj.expiration = license_data.get('valid_until') license_obj.customers = license_data.get(CUSTOMERS_ATTR) license_obj.algorithm_id = license_data.get(ALGORITHM_ID_ATTR) license_obj.save() return license_obj