redash/permissions.py (76 lines of code) (raw):
import functools
from flask_login import current_user
from flask_restful import abort
from funcy import flatten
view_only = True
not_view_only = False
ACCESS_TYPE_VIEW = "view"
ACCESS_TYPE_MODIFY = "modify"
ACCESS_TYPE_DELETE = "delete"
ACCESS_TYPES = (ACCESS_TYPE_VIEW, ACCESS_TYPE_MODIFY, ACCESS_TYPE_DELETE)
def has_access(obj, user, need_view_only):
if hasattr(obj, "api_key") and user.is_api_user():
return has_access_to_object(obj, user.id, need_view_only)
else:
return has_access_to_groups(obj, user, need_view_only)
def has_access_to_object(obj, api_key, need_view_only):
if obj.api_key == api_key:
return need_view_only
elif hasattr(obj, "dashboard_api_keys"):
# check if api_key belongs to a dashboard containing this query
return api_key in obj.dashboard_api_keys and need_view_only
else:
return False
def has_access_to_groups(obj, user, need_view_only):
groups = obj.groups if hasattr(obj, "groups") else obj
if "admin" in user.permissions:
return True
matching_groups = set(groups.keys()).intersection(user.group_ids)
if not matching_groups:
return False
required_level = 1 if need_view_only else 2
group_level = 1 if all(flatten([groups[group] for group in matching_groups])) else 2
return required_level <= group_level
def require_access(obj, user, need_view_only):
if not has_access(obj, user, need_view_only):
abort(403)
class require_permissions(object):
def __init__(self, permissions, allow_one=False):
self.permissions = permissions
self.allow_one = allow_one
def __call__(self, fn):
@functools.wraps(fn)
def decorated(*args, **kwargs):
if self.allow_one:
has_permissions = any([current_user.has_permission(permission) for permission in self.permissions])
else:
has_permissions = current_user.has_permissions(self.permissions)
if has_permissions:
return fn(*args, **kwargs)
else:
abort(403)
return decorated
def require_permission(permission):
return require_permissions((permission,))
def require_any_of_permission(permissions):
return require_permissions(permissions, True)
def require_admin(fn):
return require_permission("admin")(fn)
def require_super_admin(fn):
return require_permission("super_admin")(fn)
def has_permission_or_owner(permission, object_owner_id):
return int(object_owner_id) == current_user.id or current_user.has_permission(
permission
)
def is_admin_or_owner(object_owner_id):
return has_permission_or_owner("admin", object_owner_id)
def require_permission_or_owner(permission, object_owner_id):
if not has_permission_or_owner(permission, object_owner_id):
abort(403)
def require_admin_or_owner(object_owner_id):
if not is_admin_or_owner(object_owner_id):
abort(403, message="You don't have permission to edit this resource.")
def can_modify(obj, user):
return is_admin_or_owner(obj.user_id) or user.has_access(obj, ACCESS_TYPE_MODIFY)
def require_object_modify_permission(obj, user):
if not can_modify(obj, user):
abort(403)