modular_sdk/models/pynamodb_extension/base_safe_update_model.py (115 lines of code) (raw):
from typing import Dict, List, Optional
from pynamodb import models
from pynamodb.attributes import Attribute, MapAttribute, ListAttribute
from modular_sdk.commons import DynamoDBJsonSerializer
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.models.pynamodb_extension.base_model import BaseModel
NULL = 'NULL'
_LOG = get_logger(__name__)
class BaseSafeUpdateModel(BaseModel):
"""
Allows not to override existing attributes that are not specified
in Models on item update.
"""
_additional_data_attr_name = '_additional_data'
@classmethod
def _retrieve_additional_data(cls, document: dict,
attributes: Dict[str, Attribute]) -> dict:
"""
Additional data represents those attributes that are not defined
in Python model, but the do exist in DB. It includes all the nested
mappings and lists of mappings
:param document: raw data from DynamoDB
:param attributes: result of instance.get_attributes()
:return:
{
"not_defined_attr": "value",
"partly_defined_mapping: {
"not_defined_attr": "value"
},
"partly_defined_list": [
{},
{},
{
"not_defined_attr": "value"
}
]
}
"""
name_to_instance = {
attr.attr_name: attr for attr in attributes.values()
}
additional_data = {}
for key, value in document.items():
if key not in name_to_instance: # not defined in model
additional_data[key] = value
continue
# key in value
attr = name_to_instance[key]
if isinstance(attr, MapAttribute) and type(attr) != MapAttribute:
additional_data[key] = cls._retrieve_additional_data(
value or {}, attr.get_attributes()
)
elif isinstance(attr, ListAttribute) and \
attr.element_type and \
issubclass(attr.element_type, MapAttribute) and \
attr.element_type != MapAttribute:
inner_attributes = attr.element_type.get_attributes()
additional_data[key] = [
cls._retrieve_additional_data(v or {}, inner_attributes)
for v in value
]
# else:
# pass
return additional_data
@classmethod
def _update_with_additional_data(cls, document: dict,
additional_data: dict):
"""
Kind of deep update
:param document:
:param additional_data:
:return:
"""
for key, value in additional_data.items():
if key not in document: # not defined
document[key] = value
continue
# deep update
doc = document[key]
if type(doc) != type(value):
_LOG.warning(
'Somehow the type of existing model declaration '
f'does not correspond to the type of additional '
f'value: {doc} - {value}'
)
continue
if isinstance(doc, dict):
cls._update_with_additional_data(doc, value)
elif isinstance(doc, list): # list of dicts
# here there is a problem. We keep nested additional
# data for lists by its order. But nothing prevents us from
# changing the number of items in the list or clearing it,
# for example. Currently, it will work correctly in case the
# items of the list are not impaired
for i, dct in enumerate(doc):
_data = value[i] if len(value) > i else None
if _data:
cls._update_with_additional_data(dct, _data)
@classmethod
def _instantiate(cls, attribute_values):
instance = super()._instantiate(attribute_values)
additional_data = cls._retrieve_additional_data(
DynamoDBJsonSerializer.deserialize_model(attribute_values),
instance.get_attributes()
)
setattr(instance, cls._additional_data_attr_name, additional_data)
return instance
def _get_save_args(self, null_check: bool = True, condition=None,
add_version_condition: bool = True):
"""
Gets the proper *args, **kwargs for saving and retrieving this object
:param null_check: If True, then attributes are checked for null
:param condition: If set, a condition
"""
attribute_values = self.serialize(null_check)
# ---- our code below ----
dct = DynamoDBJsonSerializer.deserialize_model(attribute_values)
self._update_with_additional_data(
document=dct,
additional_data=getattr(self, self._additional_data_attr_name, {})
)
attribute_values = DynamoDBJsonSerializer.serialize_model(dct)
# ---- our code above ----
hash_key_attribute = self._hash_key_attribute()
hash_key = attribute_values.pop(hash_key_attribute.attr_name, {}).get(
hash_key_attribute.attr_type)
range_key = None
range_key_attribute = self._range_key_attribute()
if range_key_attribute:
range_key = attribute_values.pop(range_key_attribute.attr_name,
{}).get(
range_key_attribute.attr_type)
args = (hash_key,)
kwargs = {}
if range_key is not None:
kwargs['range_key'] = range_key
version_condition = self._handle_version_attribute(
attributes=attribute_values)
if add_version_condition and version_condition is not None:
condition &= version_condition
kwargs['attributes'] = attribute_values
kwargs['condition'] = condition
return args, kwargs
def dynamodb_model(self):
"""For MongoDB"""
result = super().dynamodb_model()
self._update_with_additional_data(
document=result,
additional_data=getattr(self, self._additional_data_attr_name, {})
)
return result
@classmethod
def from_json(cls, model_json: dict,
attributes_to_get: Optional[List] = None, instance=None
) -> Optional[models.Model]:
"""
For MongoDB
TODO use attributes_to_get as projection expression
:param model_json:
:param attributes_to_get:
:param instance:
:return:
"""
if not model_json:
return
_additional_data = \
cls._retrieve_additional_data(model_json, cls.get_attributes())
_additional_data.pop('_id', None)
instance = super().from_json(model_json, attributes_to_get, instance)
setattr(instance, cls._additional_data_attr_name, _additional_data)
return instance