in modular_sdk/models/pynamodb_extension/pynamodb_to_pymongo_adapter.py [0:0]
def query(self, model_class, hash_key, range_key_condition=None,
filter_condition=None, limit=None, last_evaluated_key=None,
attributes_to_get=None, scan_index_forward=True):
# works both for Model and Index
hash_key_name = getattr(model_class._hash_key_attribute(),
'attr_name', None)
range_key_name = getattr(model_class._range_key_attribute(),
'attr_name', None)
if issubclass(model_class, indexes.Index):
model_class = model_class.Meta.model
collection = self._collection_from_model(model_class)
_query = {hash_key_name: hash_key}
if range_key_condition is not None:
_query.update(ConditionConverter.convert(range_key_condition))
if filter_condition is not None:
_query.update(ConditionConverter.convert(filter_condition))
limit = limit or 0 # ZERO means no limit
last_evaluated_key = last_evaluated_key or 0
cursor = collection.find(_query).limit(limit).skip(last_evaluated_key)
if range_key_name:
cursor = cursor.sort(
range_key_name, ASCENDING if scan_index_forward else
DESCENDING
)
return Result(
result=(model_class.from_json(self.mongodb.decode_keys(i),
attributes_to_get) for i in cursor),
_evaluated_key=last_evaluated_key,
page_size=collection.count_documents(_query)
)