modular_sdk/services/application_service.py (168 lines of code) (raw):

from typing import Optional, Iterator, List from pynamodb.expressions.condition import Condition from pynamodb.pagination import ResultIterator from modular_sdk.commons import RESPONSE_BAD_REQUEST_CODE, \ RESPONSE_RESOURCE_NOT_FOUND_CODE, generate_id, default_instance from modular_sdk.commons.constants import AVAILABLE_APPLICATION_TYPES, \ ApplicationType from modular_sdk.commons.exception import ModularException from modular_sdk.commons.log_helper import get_logger from modular_sdk.commons.time_helper import utc_datetime from modular_sdk.models.application import Application from modular_sdk.modular import Modular from modular_sdk.services.customer_service import CustomerService _LOG = get_logger(__name__) class ApplicationService: def __init__(self, customer_service: CustomerService): self.customer_service = customer_service def build(self, customer_id: str, type: str, description: str, created_by: str, application_id: Optional[str] = None, is_deleted=False, meta: Optional[dict] = None, secret: Optional[str] = None) -> Application: application_id = application_id or generate_id() if type not in AVAILABLE_APPLICATION_TYPES: _LOG.error(f'Invalid application type specified. Available ' f'options: \'{AVAILABLE_APPLICATION_TYPES}\'') raise ModularException( code=RESPONSE_BAD_REQUEST_CODE, content=f'Invalid application type specified. Available ' f'options: \'{AVAILABLE_APPLICATION_TYPES}\'' ) if not self.customer_service.get(name=customer_id): _LOG.error(f'Customer with name \'{customer_id}\' does not exist.') raise ModularException( code=RESPONSE_RESOURCE_NOT_FOUND_CODE, content=f'Customer with name \'{customer_id}\' does not exist.' ) return Application( application_id=application_id, customer_id=customer_id, type=type.value if isinstance(type, ApplicationType) else type, description=description, is_deleted=is_deleted, meta=meta, secret=secret, created_by=created_by ) @staticmethod def get_application_by_id(application_id) -> Optional[Application]: return Application.get_nullable(hash_key=application_id) @staticmethod def i_get_application_by_customer( customer_id: str, application_type: Optional[str] = None, deleted: Optional[bool] = None ) -> ResultIterator[Application]: condition = deleted if deleted is None else ( Application.is_deleted == deleted ) rkc = None _type: str = default_instance(application_type, str) if _type: rkc = (Application.type == _type) return Application.customer_id_type_index.query( hash_key=customer_id, range_key_condition=rkc, filter_condition=condition ) @staticmethod def query_by_customer(customer: str, range_key_condition: Optional[Condition] = None, filter_condition: Optional[Condition] = None, limit: Optional[int] = None, last_evaluated_key: Optional[dict | int] = None, rate_limit: Optional[int] = None ) -> ResultIterator[Application]: """ Allows to specify flexible conditions """ return Application.customer_id_type_index.query( hash_key=customer, range_key_condition=range_key_condition, filter_condition=filter_condition, limit=limit, last_evaluated_key=last_evaluated_key, rate_limit=rate_limit ) @staticmethod def list(customer: Optional[str] = None, _type: Optional[str] = None, deleted: Optional[bool] = None, limit: Optional[int] = None, last_evaluated_key: dict | int | None = None ) -> ResultIterator[Application]: condition = None rkc = None if isinstance(deleted, bool): condition &= Application.is_deleted == deleted if isinstance(_type, str): rkc &= (Application.type == _type) if customer: return Application.customer_id_type_index.query( hash_key=customer, range_key_condition=rkc, filter_condition=condition, limit=limit, last_evaluated_key=last_evaluated_key ) if rkc is not None: condition &= rkc return Application.scan( filter_condition=condition, limit=limit, last_evaluated_key=last_evaluated_key ) @staticmethod def save(application: Application): application.save() def update_meta(self, application: Application, updated_by: str): _LOG.debug(f'Going to update application {application.application_id}' f'meta') self.update( application=application, attributes=[ Application.meta ], updated_by=updated_by ) _LOG.debug('Application meta was updated') @staticmethod def update(application: Application, attributes: List, updated_by: str): updatable_attributes = [ Application.description, Application.meta, Application.secret, Application.updated_by, Application.is_deleted ] actions = [] for attribute in attributes: if attribute not in updatable_attributes: _LOG.warning(f'Attribute {attribute.attr_name} ' f'can\'t be updated.') continue python_attr_name = Application._dynamo_to_python_attr( attribute.attr_name) update_value = getattr(application, python_attr_name) actions.append(attribute.set(update_value)) actions.append(Application.updated_by.set(updated_by)) actions.append(Application.update_timestamp.set( int(utc_datetime().timestamp() * 1e3))) application.update(actions=actions) @staticmethod def get_dto(application: Application) -> dict: return application.get_json() @staticmethod def mark_deleted(application: Application): _LOG.debug(f'Going to mark the application ' f'{application.application_id} as deleted') if application.is_deleted: _LOG.warning(f'Application \'{application.application_id}\' ' f'is already deleted.') return application.update(actions=[ Application.is_deleted.set(True), Application.deletion_timestamp.set( int(utc_datetime().timestamp() * 1e3)) ]) _LOG.debug('Application was marked as deleted') @staticmethod def force_delete(application: Application): _LOG.debug(f'Going to delete application {application.application_id}') application.delete() _LOG.debug('Application has been deleted')