annotation/annotation/categories/services.py (378 lines of code) (raw):
import uuid
from typing import Dict, List, Optional, Set, Tuple, Union
from cachetools import TTLCache, cached, keys
from filter_lib import Page, form_query, map_request_to_filter, paginate
from sqlalchemy import and_, null, or_
from sqlalchemy.event import listens_for
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import func
from sqlalchemy_utils import Ltree
from annotation import logger as app_logger
from annotation.errors import (
CheckFieldError,
ForeignKeyError,
NoSuchCategoryError,
SelfParentError,
)
from annotation.filters import CategoryFilter
from annotation.models import Category, Job
from annotation.schemas import (
CategoryInputSchema,
CategoryORMSchema,
CategoryResponseSchema,
PageSchema,
)
cache = TTLCache(maxsize=128, ttl=300)
logger = app_logger.Logger
def is_category_leaf(db: Session, category: Category, tenant: str) -> bool:
return not (
db.query(Category.id)
.filter(
and_(
Category.parent == category.id,
or_(Category.tenant == tenant, Category.tenant == null()),
)
)
.first()
)
def set_parents_is_leaf(
category_db: Category,
parents: Optional[List[CategoryResponseSchema]] = None,
is_leaf: bool = False,
) -> CategoryResponseSchema:
if parents is None:
parents = []
category_response = response_object_from_db(category_db)
category_response.is_leaf = is_leaf
category_response.parents = parents
return category_response
def insert_category_tree(
db: Session, category_db: Category, tenant: str
) -> CategoryResponseSchema:
parents = fetch_category_parents(db, category_db)
is_leaf = is_category_leaf(db, category_db, tenant)
category_response = response_object_from_db(category_db)
if category_response.parent:
category_response.parents = [
set_parents_is_leaf(category) for category in parents
]
category_response.is_leaf = is_leaf
return category_response
def add_category_db(
db: Session, category_input: CategoryInputSchema, tenant: str
) -> Category:
name = category_input.name
id_ = category_input.id
parent = category_input.parent
if parent is not None and id_ == parent:
raise SelfParentError("Category cannot be its own parent.")
if id_:
check_unique_category_field(db, id_, "id", tenant)
check_unique_category_field(db, name, "name", tenant)
parent = category_input.parent
parent_db = db.query(Category).get(parent) if parent else None
if parent_db and parent_db.tenant not in [tenant, None]:
raise ForeignKeyError("Category with this id doesn't exist.")
id_ = id_ or uuid.uuid4().hex
if parent_db and parent_db.tree:
tree = Ltree(f"{parent_db.tree.path}.{id_}")
else:
tree = Ltree(f"{id_}")
category = Category(
id=id_,
name=name,
tenant=tenant,
parent=parent if parent != "null" else None,
metadata_=category_input.metadata,
editor=category_input.editor,
data_attributes=category_input.data_attributes,
type=category_input.type,
tree=tree,
)
db.add(category)
db.commit()
return category
def response_object_from_db(category_db: Category) -> CategoryResponseSchema:
category_orm = CategoryORMSchema.from_orm(category_db).dict()
return CategoryResponseSchema.parse_obj(category_orm)
def fetch_category_parents(
db: Session, category_input: Category
) -> List[Category]:
return (
db.query(Category)
.filter(Category.tree.ancestor_of(category_input.tree))
.order_by(Category.tree.asc())
.all()[:-1]
) # remove self item from result
def fetch_category_children(
db: Session, category_input: Category
) -> List[Category]:
return (
db.query(Category)
.filter(Category.tree.descendant_of(category_input.tree))
.offset(1)
.all()
)
def check_unique_category_field(
db: Session, value: str, field: str, tenant: str
) -> None:
check_unique = db.query(
db.query(Category)
.filter(or_(Category.tenant == tenant, Category.tenant == null()))
.filter_by(**{field: value})
.exists()
).scalar()
if check_unique:
raise CheckFieldError(f"Category {field} must be unique.")
def fetch_category_db(db: Session, category_id: str, tenant: str) -> Category:
category = db.query(Category).get(category_id)
if not category or category.tenant and category.tenant != tenant:
raise NoSuchCategoryError(
f"Category with id: {category_id} doesn't exist"
)
return category
@listens_for(Category, "after_insert")
@listens_for(Category, "after_update")
@listens_for(Category, "after_delete")
def clear_child_categories_cache(*_):
"""Clears cache for recursive_subcategory_search everytime
when categories table modified"""
cache.clear()
def key_without_db_session(*args):
"""Returns cache key for each set of params given to
recursive_subcategory_search. Session param should
be excluded because it is unique for every call"""
args_without_session = [arg for arg in args if isinstance(arg, str)]
key = keys.hashkey(*args_without_session)
return key
@cached(cache=cache, key=key_without_db_session)
def recursive_subcategory_search(
db: Session, category: str, root_id: str, child_categories: Set[str]
):
"""Recursively searches through the parent-child hierarchy tree of
categories and adds all subcategories for category into 'child_categories'
set. Note that due to 'not-self-parent' constraint category cannot be
self-parent directly, but in possible cyclic parent-child relationships
category may occur as child of some self subcategories. In that case code
logic prevents infinite recursion but root category should be explicitly
discarded from returning 'child_categories' set.
"""
skipped_categories = {*child_categories, category, root_id}
child_ids = [
child.id
for child in db.query(Category).filter_by(parent=category).all()
if child.id not in skipped_categories
]
if child_ids:
child_categories.update(child_ids)
for child_id in child_ids:
recursive_subcategory_search(
db, child_id, root_id, child_categories
)
return child_categories
def fetch_bunch_categories_db(
db: Session,
category_ids: Set[str],
tenant: str,
job_id: Optional[int] = None,
root_parents: bool = False, # If true, add categories parents up to root
) -> List[Category]:
categories_query = db.query(Category)
if job_id is not None:
categories_query = categories_query.join(Category.jobs).filter(
Job.job_id == job_id
)
categories = categories_query.filter(
and_(
Category.id.in_(category_ids),
or_(Category.tenant == tenant, Category.tenant == null()),
)
).all()
wrong_categories = {
str(category.id) for category in categories
}.symmetric_difference(category_ids)
if wrong_categories:
error_message = ", ".join(sorted(wrong_categories))
raise NoSuchCategoryError(f"No such categories: {error_message}")
if root_parents:
categories_parents = _get_parents(db, categories, tenant, job_id)
categories = list(
set(categories).union(
cat for cats in categories_parents.values() for cat in cats
)
)
return categories
CategoryIdT = str
CategoryPathT = str
IsLeafT = bool
Leaves = Dict[CategoryIdT, IsLeafT]
Parents = Dict[CategoryPathT, List[Category]]
def _get_leaves(
db: Session,
categories: List[Category],
tenant: str,
job_id: Optional[int] = None,
) -> Leaves:
leaves: Leaves = {c.id: True for c in categories}
categories_query = db.query(Category)
if job_id is not None:
categories_query = categories_query.join(Category.jobs).filter(
Job.job_id == job_id
)
categories_query = categories_query.filter(
and_(
Category.parent.in_(leaves.keys()),
or_(Category.tenant == tenant, Category.tenant == null()),
)
)
for child in categories_query.all():
leaves[child.parent] = False
return leaves
def _extract_category(
path: str, categories: Dict[str, Category]
) -> List[Category]:
return [categories[node] for node in path.split(".")[0:-1]]
def _get_parents(
db: Session,
categories: List[Category],
tenant: str,
job_id: Optional[int] = None,
) -> Parents:
path_to_category: Parents = {}
uniq_cats = set()
uniq_pathes = set()
for cat in categories:
# if we pass root categories it causes exception.
if cat.tree is not None:
uniq_pathes.add(cat.tree.path)
uniq_cats = uniq_cats.union({tree.path for tree in cat.tree})
category_to_object = {
cat.id: cat
for cat in fetch_bunch_categories_db(db, uniq_cats, tenant, job_id)
}
for path in uniq_pathes:
path_to_category[path] = _extract_category(path, category_to_object)
return path_to_category
def _compose_response(
categories: List[Category], leaves: Leaves, parents: Parents
) -> List[CategoryResponseSchema]:
converted_parents = {}
for parent_path in parents:
converted_parents[parent_path] = [
CategoryResponseSchema.parse_obj(
{
**CategoryORMSchema.from_orm(cat).dict(),
"is_leaf": False,
}
)
for cat in parents[parent_path]
]
return [
CategoryResponseSchema.parse_obj(
{
**CategoryORMSchema.from_orm(cat).dict(),
"is_leaf": leaves.get(cat.id, False),
"parents": (
converted_parents.get(cat.tree.path, [])
if cat.tree
else []
),
}
)
for cat in categories
]
def _get_child_categories(
db: Session,
request: CategoryFilter,
tenant: str,
job_id: Optional[int] = None,
) -> Tuple:
categories_query = db.query(Category)
if job_id is not None:
categories_query = categories_query.join(Category.jobs).filter(
and_(Job.job_id == job_id, Job.tenant == tenant)
)
else:
categories_query = categories_query.filter(
or_(Category.tenant == tenant, Category.tenant == null())
)
filter_args = map_request_to_filter(request.dict(), Category.__name__)
category_query, pagination = form_query(filter_args, categories_query)
return category_query.all(), pagination
def filter_category_db(
db: Session,
request: CategoryFilter,
tenant: str,
job_id: Optional[int] = None,
) -> Page[Union[CategoryResponseSchema, str, dict]]:
child_categories, pagination = _get_child_categories(
db, request, tenant, job_id
)
if request.filters and "distinct" in [
item.operator.value for item in request.filters
]:
return paginate(child_categories, pagination)
return paginate(
_compose_response(
child_categories,
_get_leaves(db, child_categories, tenant, job_id),
_get_parents(db, child_categories, tenant, job_id),
),
pagination,
)
def update_category_tree(
db: Session,
category_db: Category,
new_parent: Category = None,
) -> None:
tree = category_db.tree
nlevel = len(tree) - 1
query = db.query(Category).filter(Category.tree.op("<@")(tree))
new_path = func.subpath(Category.tree, nlevel)
if new_parent:
new_path = new_parent.tree.path + new_path
query.update(values={"tree": new_path}, synchronize_session=False)
def update_category_db(
db: Session, category_id: str, update_query: dict, tenant: str
) -> Category:
category = db.query(Category).get(category_id)
if not category or category.tenant not in [tenant, None]:
raise NoSuchCategoryError("Cannot update category that doesn't exist")
elif category.tenant is None:
raise CheckFieldError("Cannot update default category.")
if category_id == update_query["parent"]:
raise SelfParentError("Category cannot be its own parent.")
update_query["parent"] = (
update_query["parent"] if update_query["parent"] != "null" else None
)
ex_parent_id = category.parent
new_parent_id = update_query["parent"]
parent_db = (
db.query(Category).get(new_parent_id) if new_parent_id else None
)
if parent_db and parent_db.tenant not in [tenant, None]:
raise ForeignKeyError("Category with this id doesn't exist.")
name = (update_query["name"],)
check_unique = (
db.query(Category)
.filter(or_(Category.tenant == tenant, Category.tenant == null()))
.filter_by(name=name)
.first()
)
if update_query["name"] != category.name and check_unique:
raise CheckFieldError("Category name must be unique.")
update_query["metadata_"] = update_query.get("metadata")
update_query["id"] = category_id
for field, value in update_query.items():
setattr(category, field, value)
if ex_parent_id != new_parent_id and category.tree:
update_category_tree(db, category, parent_db)
db.add(category)
db.commit()
return category
def delete_category_db(db: Session, category_id: str, tenant: str) -> None:
category = db.query(Category).get(category_id)
if not category or category.tenant not in [tenant, None]:
raise NoSuchCategoryError("Cannot delete category that doesn't exist")
elif category.tenant is None:
raise CheckFieldError("Cannot delete default category.")
db.delete(category)
db.commit()
def combine_categories(
categories: Optional[Set[str]], pages: Optional[List[PageSchema]]
) -> Set[str]:
if not categories:
categories = set()
pages_categories = set()
for page in pages:
for obj in page.objs:
cat = obj.get("category")
if cat:
pages_categories.add(cat)
return categories | pages_categories