modular_sdk/models/parent.py (78 lines of code) (raw):

from typing import Optional from pynamodb.attributes import UnicodeAttribute, MapAttribute, \ BooleanAttribute, NumberAttribute from pynamodb.indexes import AllProjection from modular_sdk.commons.constants import COMPOUND_KEYS_SEPARATOR, ParentScope from modular_sdk.models.base_meta import BaseMeta, TABLES_PREFIX from modular_sdk.models.pynamodb_extension.base_model import BaseGSI from modular_sdk.models.pynamodb_extension.base_role_access_model import \ BaseRoleAccessModel PARENT_ID = 'pid' CUSTOMER_ID = 'cid' APPLICATION_ID = 'aid' TYPE = 't' DESCRIPTION = 'descr' META = 'meta' IS_DELETED = 'd' DELETION_DATE = 'dd' # todo deprecated CREATION_TIMESTAMP = 'ct' UPDATE_TIMESTAMP = 'ut' DELETION_TIMESTAMP = 'dt' SCOPE_ATTR = 's' UPDATED_BY = 'ub' CREATED_BY = 'cb' MODULAR_PARENTS_TABLE_NAME = 'Parents' class CustomerIdScopeIndex(BaseGSI): class Meta(BaseMeta): index_name = f'{CUSTOMER_ID}-{SCOPE_ATTR}-index' read_capacity_units = 1 write_capacity_units = 1 projection = AllProjection() customer_id = UnicodeAttribute(hash_key=True, attr_name=CUSTOMER_ID) type_scope = UnicodeAttribute(range_key=True, attr_name=SCOPE_ATTR) # this index currently does not exist. It's for the future :) class ApplicationIdIndex(BaseGSI): class Meta(BaseMeta): index_name = f'{APPLICATION_ID}-index' read_capacity_units = 1 write_capacity_units = 1 projection = AllProjection() application_id = UnicodeAttribute(hash_key=True, attr_name=APPLICATION_ID) class Parent(BaseRoleAccessModel): class Meta(BaseMeta): table_name = f'{TABLES_PREFIX}{MODULAR_PARENTS_TABLE_NAME}' parent_id = UnicodeAttribute(hash_key=True, attr_name=PARENT_ID) customer_id = UnicodeAttribute(attr_name=CUSTOMER_ID) application_id = UnicodeAttribute(attr_name=APPLICATION_ID) type = UnicodeAttribute(attr_name=TYPE) description = UnicodeAttribute(attr_name=DESCRIPTION, null=True) meta = MapAttribute(attr_name=META, default=dict) is_deleted = BooleanAttribute(attr_name=IS_DELETED) deletion_date = UnicodeAttribute(attr_name=DELETION_DATE, null=True) # todo deprecated creation_timestamp = NumberAttribute(attr_name=CREATION_TIMESTAMP, null=True) update_timestamp = NumberAttribute(attr_name=UPDATE_TIMESTAMP, null=True) deletion_timestamp = NumberAttribute(attr_name=DELETION_TIMESTAMP, null=True) # in case the attribute is not null its format must # adhere to [type]#[scope]#[tenant|cloud] type_scope = UnicodeAttribute(attr_name=SCOPE_ATTR, null=True) updated_by = UnicodeAttribute(attr_name=UPDATED_BY, null=True) created_by = UnicodeAttribute(attr_name=CREATED_BY, null=True) customer_id_scope_index = CustomerIdScopeIndex() application_id_index = ApplicationIdIndex() # todo use if self.type is removed # @property # def type(self) -> str: # return self.type_scope.split(COMPOUND_KEYS_SEPARATOR)[0] @property def scope(self) -> Optional[str]: if not self.type_scope: return return self.type_scope.split(COMPOUND_KEYS_SEPARATOR)[1] @property def tenant_name(self) -> Optional[str]: if not self.type_scope: return if self.scope == ParentScope.ALL: # we cannot specify tenant when scope is ALL return return self.type_scope.split(COMPOUND_KEYS_SEPARATOR)[2] @property def cloud(self) -> Optional[str]: if not self.type_scope: return if self.scope == ParentScope.ALL: # we can specify cloud only if scope ALL return self.type_scope.split(COMPOUND_KEYS_SEPARATOR)[2].upper()