in modular_sdk/services/parent_service.py [0:0]
def query_by_scope_index(self, customer_id: str,
type_: Optional[ParentType] = None,
scope: Optional[ParentScope] = None,
tenant_or_cloud: Optional[str] = None,
by_prefix: Optional[bool] = False,
is_deleted: Optional[bool] = False,
limit: Optional[int] = None,
last_evaluated_key: Optional[dict] = None,
ascending: Optional[bool] = True
) -> Iterator[Parent]:
"""
Low-level query method
:param customer_id:
:param type_:
:param scope:
:param tenant_or_cloud:
:param by_prefix:
:param is_deleted:
:param limit:
:param last_evaluated_key:
:param ascending:
:return:
"""
# can be an empty string is we want to retrieve with literally '' cloud
is_tenant_or_cloud = isinstance(tenant_or_cloud, str)
if is_tenant_or_cloud and not scope or scope and not type_:
raise AssertionError('invalid usage')
if type_ and scope and is_tenant_or_cloud:
key = COMPOUND_KEYS_SEPARATOR.join((type_, scope, tenant_or_cloud))
if by_prefix:
rkc = Parent.type_scope.startswith(key)
else:
rkc = (Parent.type_scope == key)
elif type_ and scope:
rkc = Parent.type_scope.startswith(COMPOUND_KEYS_SEPARATOR.join((
type_, scope, ''
)))
elif type_:
rkc = Parent.type_scope.startswith(
f'{type_}{COMPOUND_KEYS_SEPARATOR}')
else:
rkc = None
fc = None
if isinstance(is_deleted, bool):
fc = (Parent.is_deleted == is_deleted)
return Parent.customer_id_scope_index.query(
hash_key=customer_id,
range_key_condition=rkc,
limit=limit,
last_evaluated_key=last_evaluated_key,
scan_index_forward=ascending,
filter_condition=fc
)