modular_sdk/models/pynamodb_extension/base_model.py (434 lines of code) (raw):

import base64 import binascii import json import os from datetime import datetime from typing import (Any, Optional, Dict, Sequence, Iterable, Text, Union, Iterator, Type, List) from dynamodb_json import json_util from dynamodb_json import json_util as dynamo_json from pynamodb import indexes from pynamodb import models from pynamodb.attributes import (MapAttribute, Attribute, UnicodeAttribute, NumberAttribute, ListAttribute, BooleanAttribute, JSONAttribute) from pynamodb.constants import BOOLEAN, NUMBER, LIST, MAP, \ NULL, STRING from pynamodb.exceptions import DoesNotExist, AttributeDeserializationError from pynamodb.expressions.condition import Condition from pynamodb.expressions.update import Action from pynamodb.indexes import _M from pynamodb.models import _T, _KeyType, BatchWrite from pynamodb.pagination import ResultIterator from pynamodb.settings import OperationSettings from modular_sdk.commons.constants import MODULAR_SERVICE_MODE_ENV, \ SERVICE_MODE_DOCKER, PARAM_MONGO_USER, PARAM_MONGO_PASSWORD, \ PARAM_MONGO_URL, PARAM_MONGO_DB_NAME from modular_sdk.commons.helpers import classproperty from modular_sdk.commons.log_helper import get_logger from modular_sdk.commons.time_helper import utc_iso _LOG = get_logger(__name__) def build_mongodb_uri(user: str, password: str, url: str) -> str: """ Just makes the right formatting """ return f'mongodb://{user}:{password}@{url}/' class M3BooleanAttribute(BooleanAttribute): def get_value(self, value: Dict[str, Any]) -> Any: if BOOLEAN not in value and NUMBER not in value: raise AttributeDeserializationError(self.attr_name, self.attr_type) if value.get(BOOLEAN) is not None: return value[BOOLEAN] return int(value.get(NUMBER)) class MongoSpecificType(Attribute[Any]): """ Sometimes our models have attributes with different types in DynamoDB and MongoDB. A good example is Tenant.regions.rId. IN DynamoDB it's a string, in MongoDB -> ObjectId. This custom attribute can be used to mark such different attributes if you know that they are different. Basically, it's the same as UnicodeAttribute but it kind of adds more transparency. Why exactly UnicodeAttribute - it does nothing to the value, returned from DB. Just proxies it. This way we can be sure that we will not corrupt some Mongo types (Date, Binary, ObjectId). """ attr_type = STRING def serialize(self, value: Any) -> Any: """ This method should return a dynamodb compatible value """ return value def deserialize(self, value: Any) -> Any: """ Performs any needed deserialization on the value """ return value class DynamicAttribute(Attribute): _types_to_attributes = { str: UnicodeAttribute, dict: MapAttribute, float: NumberAttribute, int: NumberAttribute, list: ListAttribute, tuple: ListAttribute, bool: BooleanAttribute, bytes: JSONAttribute # todo, BinaryAttribute would fit better but } # ... but this class has a bug -> it does not perform data # deserialization. The raw value from DB is returned all the time. # For Unicode, Number, List, Bool it is more or less acceptable, and # it works. For BinaryAttribute it will not work because BinaryAttribute # class encodes the data to b64 before sending to DB. As you can guess, # it does not decode the data when you receive it from DB. def serialize(self, value: Any) -> Any: value_type = type(value) attribute_class = self._types_to_attributes.get(value_type) if not attribute_class: raise AssertionError( f'There is no serializer for type: {value_type}') if attribute_class is JSONAttribute and isinstance(value, bytes): value = json.loads(value) attribute_instance = attribute_class() self.attr_type = attribute_class.attr_type return attribute_instance.serialize(value) def get_value(self, value: Any) -> Any: attr_type = list(value.keys()) if len(attr_type) > 1: raise AssertionError( f'Unexpected key specifier: {attr_type}:{len(attr_type)}; ' f'Attribute: {value}') return dynamo_json.loads(value) class ModelEncoder(json.JSONEncoder): """ It converts the item to DTO only representation. Do not use get_json() on model and then write the result to DB again. Such actions corrupt the item """ def default(self, obj): if hasattr(obj, 'attribute_values'): return obj.attribute_values elif isinstance(obj, datetime): return utc_iso(_from=obj) else: # ObjectId, bytes and others return str(obj) # return json.JSONEncoder.default(self, obj) def json_to_attribute_value(value: Any) -> Dict[str, Any]: """ Overrides the one from "pynamodb.util" to handle MongoDB specific attributes such as ObjectId, Date, Binary and others :param value: :return: """ if value is None: return {NULL: True} if value is True or value is False: return {BOOLEAN: value} if isinstance(value, (int, float)): return {NUMBER: json.dumps(value)} if isinstance(value, str): return {STRING: value} if isinstance(value, list): return {LIST: [json_to_attribute_value(v) for v in value]} if isinstance(value, dict): return {MAP: {k: json_to_attribute_value(v) for k, v in value.items()}} # changed part below # In case we don't know how to convert an attribute, we just proxy it. # STRING is used because PynamoDB does nothing to change the value # in case it's type STRING. So, the value won't be corrupted. # It's used only for on-prem return {STRING: value} # raise ValueError("Unknown value type: {}".format(type(value).__name__)) class ABCMongoDBHandlerMixin: """ Must NOT be inherited from :class:`abc.ABC` because it's used as a mixin with other classes, in particular with :class:`pynamodb.models.Model` which already has Metaclass. Making this class inherited from ABC entails "metaclass conflict". """ _mongodb = None @classmethod def mongodb_handler(cls): raise NotImplementedError @classmethod def reset_mongodb(cls): cls._mongodb = None class ModularMongoDBHandlerMixin(ABCMongoDBHandlerMixin): @classmethod def mongodb_handler(cls): if not cls._mongodb: from modular_sdk.connections.mongodb_connection import \ MongoDBConnection from modular_sdk.models.pynamodb_extension.pynamodb_to_pymongo_adapter \ import PynamoDBToPyMongoAdapter user = os.environ.get(PARAM_MONGO_USER) password = os.environ.get(PARAM_MONGO_PASSWORD) url = os.environ.get(PARAM_MONGO_URL) db = os.environ.get(PARAM_MONGO_DB_NAME) cls._mongodb = PynamoDBToPyMongoAdapter( mongodb_connection=MongoDBConnection( build_mongodb_uri(user, password, url), db ) ) return cls._mongodb class RawBaseModel(models.Model): """ Raw abstract base model class which exposes the common API to interact both with DynamoDB and MongoDB. Nevertheless, it must not be used by itself. Use either BaseModel or create your own class with implemented `mongodb_handler`. """ @classmethod def mongodb_handler(cls): """ Must return an initialized PynamoDBToPyMongoAdapter or maybe some other class which implements its interface. :return: :class:`modular_sdkmodels.pynamodb_extension.pynamodb_to_pymongo_adapter.PynamoDBToPyMongoAdapter` """ raise NotImplementedError( 'You cannot use RawBaseModel by itself. Use BaseModel or' ' define your own base class with implemented `mongodb_handler` ' 'class method') @classproperty def is_docker(cls) -> bool: return os.environ.get(MODULAR_SERVICE_MODE_ENV) == SERVICE_MODE_DOCKER @classmethod def get_nullable(cls, hash_key, range_key=None, attributes_to_get=None, consistent_read=False): if cls.is_docker: return cls.mongodb_handler().get_nullable( model_class=cls, hash_key=hash_key, sort_key=range_key) try: return cls.get(hash_key, range_key, attributes_to_get=attributes_to_get, consistent_read=consistent_read) except DoesNotExist as e: _LOG.debug(f'{cls.__name__} does not exist ' f'with the following keys: hash_key={hash_key}, ' f'range_key={range_key}: {e.msg}') return def save(self, condition: Optional[Condition] = None, settings: OperationSettings = OperationSettings.default ) -> Dict[str, Any]: if self.is_docker: return self.mongodb_handler().save(model_instance=self) return super().save(condition, settings) @classmethod def batch_get( cls: Type[_T], items: Iterable[Union[_KeyType, Iterable[_KeyType]]], consistent_read: Optional[bool] = None, attributes_to_get: Optional[Sequence[str]] = None, settings: OperationSettings = OperationSettings.default ) -> Iterator[_T]: if cls.is_docker: return cls.mongodb_handler().batch_get( model_class=cls, items=items, attributes_to_get=attributes_to_get) return super().batch_get(items, consistent_read, attributes_to_get, settings) @classmethod def batch_write(cls: Type[_T], auto_commit: bool = True, settings: OperationSettings = OperationSettings.default ) -> BatchWrite[_T]: if cls.is_docker: return cls.mongodb_handler().batch_write(model_class=cls) return super().batch_write(auto_commit, settings) def delete(self, condition: Optional[Condition] = None, settings: OperationSettings = OperationSettings.default) -> Any: if self.is_docker: return self.mongodb_handler().delete(model_instance=self) return super().delete(condition, settings) def update(self, actions: List[Action], condition: Optional[Condition] = None, settings: OperationSettings = OperationSettings.default) -> Any: if self.is_docker: return self.mongodb_handler().update(self, actions, condition, settings) return super().update(actions, condition, settings) def refresh(self, consistent_read: bool = False, settings: OperationSettings = OperationSettings.default ) -> None: if self.is_docker: return self.mongodb_handler().refresh( consistent_read=consistent_read) return super().refresh(consistent_read, settings) @classmethod def get( cls: Type[_T], hash_key: _KeyType, range_key: Optional[_KeyType] = None, consistent_read: bool = False, attributes_to_get: Optional[Sequence[Text]] = None, settings: OperationSettings = OperationSettings.default ) -> _T: if cls.is_docker: return cls.mongodb_handler().get( model_class=cls, hash_key=hash_key, range_key=range_key) return super().get(hash_key, range_key, consistent_read, attributes_to_get, settings) @classmethod def count( cls: Type[_T], hash_key: Optional[_KeyType] = None, range_key_condition: Optional[Condition] = None, filter_condition: Optional[Condition] = None, consistent_read: bool = False, index_name: Optional[str] = None, limit: Optional[int] = None, rate_limit: Optional[float] = None, settings: OperationSettings = OperationSettings.default, ) -> int: if cls.is_docker: return cls.mongodb_handler().count( model_class=cls, hash_key=hash_key, range_key_condition=range_key_condition, filter_condition=filter_condition, index_name=index_name, limit=limit ) return super().count(hash_key, range_key_condition, filter_condition, consistent_read, index_name, limit, rate_limit, settings) @classmethod def query( cls: Type[_T], hash_key: _KeyType, range_key_condition: Optional[Condition] = None, filter_condition: Optional[Condition] = None, consistent_read: bool = False, index_name: Optional[str] = None, scan_index_forward: Optional[bool] = None, limit: Optional[int] = None, last_evaluated_key: Optional[Dict[str, Dict[str, Any]]] = None, attributes_to_get: Optional[Iterable[str]] = None, page_size: Optional[int] = None, rate_limit: Optional[float] = None, settings: OperationSettings = OperationSettings.default, ) -> ResultIterator[_T]: if cls.is_docker: return cls.mongodb_handler().query( model_class=cls, hash_key=hash_key, filter_condition=filter_condition, range_key_condition=range_key_condition, limit=limit, last_evaluated_key=last_evaluated_key, attributes_to_get=attributes_to_get, scan_index_forward=scan_index_forward ) return super().query(hash_key, range_key_condition, filter_condition, consistent_read, index_name, scan_index_forward, limit, last_evaluated_key, attributes_to_get, page_size, rate_limit, settings) @classmethod def scan( cls: Type[_T], filter_condition: Optional[Condition] = None, segment: Optional[int] = None, total_segments: Optional[int] = None, limit: Optional[int] = None, last_evaluated_key: Optional[Dict[str, Dict[str, Any]]] = None, page_size: Optional[int] = None, consistent_read: Optional[bool] = None, index_name: Optional[str] = None, rate_limit: Optional[float] = None, attributes_to_get: Optional[Sequence[str]] = None, settings: OperationSettings = OperationSettings.default, ) -> ResultIterator[_T]: if cls.is_docker: return cls.mongodb_handler().scan( model_class=cls, filter_condition=filter_condition, limit=limit, last_evaluated_key=last_evaluated_key, attributes_to_get=attributes_to_get ) return super().scan(filter_condition, segment, total_segments, limit, last_evaluated_key, page_size, consistent_read, index_name, rate_limit, attributes_to_get, settings) def get_json(self) -> dict: """ Returns dict which can be dumped to JSON. So, in case the model contains Date or Binary, or ObjectId -> they will become strings. :return: """ return json.loads(json.dumps(self, cls=ModelEncoder)) def dynamodb_model(self): model = self.__unmap_map_attribute(item=self) result = self.__model_to_dict(model) if hasattr(self, 'mongo_id'): result['mongo_id'] = self.mongo_id return result def get_keys(self): return self._get_keys() def __model_to_dict(self, model): try: items = model.items() except AttributeError: return model for key, value in items: if isinstance(value, MapAttribute): processed_value = self.__unmap_map_attribute(item=value) model[key] = self.__model_to_dict(processed_value) elif isinstance(value, list): model[key] = [self.__model_to_dict( self.__unmap_map_attribute(item=each)) for each in value] elif isinstance(value, dict): # just in case there is datetime inside value = self.__model_to_dict(value) model[key] = json_util.loads(value) # elif isinstance(value, datetime): # model[key] = utc_iso(_from=value) return model @classmethod def from_json(cls, model_json: dict, attributes_to_get: Optional[List] = None, instance: Optional[_T] = None) -> models.Model: _id = model_json.pop('_id', None) instance = instance or cls() if attributes_to_get: to_get = set( attr.attr_name if isinstance(attr, Attribute) else attr for attr in attributes_to_get ) model_json = {k: v for k, v in model_json.items() if k in to_get} attribute_values = {k: json_to_attribute_value(v) for k, v in model_json.items()} # if uncommented, custom DynamicAttribute won't work due to # attr_type property # instance._update_attribute_types(attribute_values) instance.deserialize(attribute_values) instance.mongo_id = _id return instance @staticmethod def __unmap_map_attribute(item): try: attr_values = item.attribute_values except AttributeError: return item if not attr_values or (isinstance(item, MapAttribute) and type(item) == MapAttribute): return attr_values processed_item = {} for attr_value_key in attr_values.keys(): py_to_ddb = {py_key: db_key for db_key, py_key in item._dynamo_to_python_attrs.items()} match = py_to_ddb.get(attr_value_key) if match: processed_item[match] = attr_values[attr_value_key] else: processed_item[attr_value_key] = attr_values[attr_value_key] return processed_item def __repr__(self): return str(self.__dict__) class RawBaseGSI(indexes.GlobalSecondaryIndex): @classproperty def is_docker(cls) -> bool: return os.environ.get(MODULAR_SERVICE_MODE_ENV) == SERVICE_MODE_DOCKER @classmethod def _range_key_attribute(cls) -> Attribute: """ Returns the attribute class for the range key. One may wonder why PynamoDB 5.2.1 does not have this method... """ for attr_cls in cls.Meta.attributes.values(): if attr_cls.is_range_key: return attr_cls @classmethod def mongodb_handler(cls): """ Must return an initialized PynamoDBToPyMongoAdapter or maybe some other class which implements its interface. :return: :class:`modular_sdkmodels.pynamodb_extension.pynamodb_to_pymongo_adapter.PynamoDBToPyMongoAdapter` """ raise NotImplementedError( 'You cannot use RawBaseModel by itself. Use BaseModel or' ' define your own base class with implemented `mongodb_handler` ' 'class method') @classmethod def query(cls, hash_key: _KeyType, range_key_condition: Optional[Condition] = None, filter_condition: Optional[Condition] = None, consistent_read: Optional[bool] = False, scan_index_forward: Optional[bool] = None, limit: Optional[int] = None, last_evaluated_key: Optional[Dict[str, Dict[str, Any]]] = None, attributes_to_get: Optional[List[str]] = None, page_size: Optional[int] = None, rate_limit: Optional[float] = None) -> ResultIterator[_M]: if cls.is_docker: return cls.mongodb_handler().query( model_class=cls, hash_key=hash_key, filter_condition=filter_condition, range_key_condition=range_key_condition, limit=limit, last_evaluated_key=last_evaluated_key, attributes_to_get=attributes_to_get, scan_index_forward=scan_index_forward ) return super().query(hash_key, range_key_condition, filter_condition, consistent_read, scan_index_forward, limit, last_evaluated_key, attributes_to_get, page_size, rate_limit) class BaseModel(ModularMongoDBHandlerMixin, RawBaseModel): pass class BaseGSI(ModularMongoDBHandlerMixin, RawBaseGSI): pass class LastEvaluatedKey: """ Simple abstraction over DynamoDB last evaluated key & MongoDB offset :) """ payload_key_name = 'key' def __init__(self, lek: Optional[Union[dict, int]] = None): self._lek = lek def serialize(self) -> str: payload = {self.payload_key_name: self._lek} return base64.urlsafe_b64encode( json.dumps(payload, separators=(",", ":"), sort_keys=True).encode() ).decode() @classmethod def deserialize(cls, s: Optional[str] = None) -> 'LastEvaluatedKey': if not s or not isinstance(s, str): return cls() _payload = {} try: decoded = base64.urlsafe_b64decode(s.encode()).decode() _payload = json.loads(decoded) except binascii.Error: _LOG.warning('Invalid base64 encoding in last evaluated key token') except json.JSONDecodeError: _LOG.warning('Invalid json string within last evaluated key token') except Exception as e: # you never know :) _LOG.warning('Some unexpected exception occurred while ' f'deserializing last evaluated key token : \'{e}\'') return cls(_payload.get(cls.payload_key_name)) @property def value(self) -> Optional[Union[dict, int]]: return self._lek @value.setter def value(self, v: Optional[Union[dict, int]]): self._lek = v def __bool__(self) -> bool: return bool(self._lek)