modular_api/services/permissions_cache_service.py (180 lines of code) (raw):
import json
import os
from pathlib import Path
from modular_api.models.user_model import User
from modular_api.helpers.constants import ACTIVATED_STATE, ALLOWED_VALUES
from modular_api.helpers.exceptions import (
ModularApiUnauthorizedException, ModularApiBadRequestException,
)
from modular_api.helpers.jwt_auth import decode_jwt_token
from modular_api.helpers.log_helper import get_logger
from modular_api.helpers.password_util import secure_string
from modular_api.services import SERVICE_PROVIDER
from modular_api.services.group_service import GroupService
from modular_api.services.policy_service import PolicyService
from modular_api.services.user_service import UserService
from modular_api.web_service import WEB_SERVICE_PATH, COMMANDS_BASE_FILE_NAME
from modular_api.web_service.iam import filter_commands_by_permissions
_LOG = get_logger(__name__)
class PermissionsService:
def __init__(self, user_service, group_service, policy_service):
self.available_commands = self.get_available_commands()
self.group_allowed_commands_mapping = {}
self.user_service: UserService = user_service
self.group_service: GroupService = group_service
self.policy_service: PolicyService = policy_service
@staticmethod
def get_available_commands() -> dict:
commands_base_path = Path(__file__).parent.parent / WEB_SERVICE_PATH / COMMANDS_BASE_FILE_NAME
_LOG.info(f'Getting available commands from {commands_base_path}')
if not os.path.isfile(commands_base_path):
return {}
# unable__to_run_server_message = 'Can not run server without any ' \
# 'installed modules'
# _LOG.error(unable__to_run_server_message)
# raise ModularApiConfigurationException(unable__to_run_server_message)
with open(commands_base_path) as file:
available_commands = json.load(file)
return available_commands
def resolve_available_commands(self, group_names, empty_cache):
_LOG.info(f'Available commands resolving for \'{group_names}\' '
f'groups')
policy_aggregation = []
for group in group_names:
if empty_cache or group not in self.group_allowed_commands_mapping:
policy_aggregation.extend(
self.generate_allowed_commands(group_name=group))
else:
policy_aggregation.extend(
self.group_allowed_commands_mapping[group])
user_allowed_commands = filter_commands_by_permissions(
available_commands=self.available_commands,
group_policy=policy_aggregation)
return user_allowed_commands
def generate_allowed_commands(self, group_name):
_LOG.info(f'Available commands generating for {group_name} '
f'groups')
group_item = self.group_service.describe_group(group_name=group_name)
is_hash_invalid = self.group_service.calculate_group_hash(group_item) \
!= group_item.hash
is_group_disabled = group_item.state != ACTIVATED_STATE
if is_hash_invalid or is_group_disabled:
if is_hash_invalid:
possible_reason = 'compromised item'
elif is_group_disabled:
possible_reason = 'inactive state'
else:
possible_reason = 'compromised item and inactive state'
_LOG.error(f'{group_item.group_name} group invalid. Possible '
f'reason: {possible_reason}')
raise ModularApiUnauthorizedException(
'Provided credentials are invalid or the access was revoked, '
'please contact service administrator')
group_policy = []
for policy in group_item.policies:
policy_item = self.policy_service.describe_policy(
policy_name=policy)
is_policy_hash_invalid = self.policy_service.calculate_policy_hash(
policy_item=policy_item) != policy_item.hash
is_policy_deactivated = policy_item.state != ACTIVATED_STATE
if is_policy_hash_invalid or is_policy_deactivated:
if is_policy_hash_invalid:
possible_reason = 'compromised item'
elif is_policy_deactivated:
possible_reason = 'inactive state'
else:
possible_reason = 'compromised item and inactive state'
_LOG.error(f'{group_item.group_name} policy invalid. Possible '
f'reason: {possible_reason}')
raise ModularApiUnauthorizedException(
'Provided credentials are invalid or the access was '
'revoked, please contact service administrator')
for policy_content in policy_item.content:
group_policy.append(policy_content)
self.group_allowed_commands_mapping[group_item.group_name] = \
group_policy
return self.group_allowed_commands_mapping[group_item.group_name]
def get_user_item_or_raise_error(self, username: str) -> User:
if not username:
_LOG.info('Username is empty')
raise ModularApiBadRequestException('Username is empty')
_LOG.info(f'Going to get user item by {username} username')
user_item = self.user_service.describe_user(username=username)
if not user_item:
_LOG.info(f'[auth] User does not exist: {username}')
raise ModularApiUnauthorizedException('User does not exist')
return user_item
def check_user_item_is_valid(self, user_item):
_LOG.info(f'Going to check if {user_item.username} user able to '
f'perform commands')
calculated_hash = self.user_service.calculate_user_hash(
user_item=user_item
)
if calculated_hash != user_item.hash:
_LOG.error(f'{user_item.username} user item compromised')
raise ModularApiUnauthorizedException(
'Provided credentials are invalid or the access was '
'revoked, please contact service administrator')
elif user_item.state != ACTIVATED_STATE:
_LOG.error(f'{user_item.username} user item in inactive state')
raise ModularApiUnauthorizedException(
'Provided credentials are invalid or the access was '
'revoked, please contact service administrator')
def authenticate_user(
self,
username: str,
password: str | None = None,
token: str | None = None,
empty_cache: bool = False,
) -> tuple:
if not any([password, token]):
raise ModularApiUnauthorizedException(
'Password or token was not provided'
)
if token:
username = self.validate_jwt_and_get_user(token=token)
if not username:
_LOG.info(
f'[auth] Invalid JWT: {username}. Access denied')
raise ModularApiUnauthorizedException('Access denied')
user_item = self.get_user_item_or_raise_error(username=username)
self.check_user_item_is_valid(user_item=user_item)
else:
user_item = self.get_user_item_or_raise_error(username=username)
self.check_user_item_is_valid(user_item=user_item)
is_password_valid = self.validate_password(
username=username,
user_password=user_item.password,
provided_password=password,
)
if not is_password_valid:
_LOG.info(
f'[auth] Invalid password: {username}. Access denied'
)
raise ModularApiUnauthorizedException('Access denied')
available_commands = self.resolve_available_commands(
group_names=user_item.groups,
empty_cache=empty_cache
)
user_meta = (
user_item.meta.as_dict().get(ALLOWED_VALUES, {})
) if user_item.meta else {}
return available_commands, user_meta
@staticmethod
def validate_jwt_and_get_user(token: str) -> str | None:
payload = decode_jwt_token(token)
if not payload:
_LOG.info('[auth] User is NOT authenticated with JWT')
return
username = payload.get('username')
_LOG.info(f'[auth] User {username} is authenticated with JWT')
return username
@staticmethod
def validate_password(username, user_password, provided_password):
if secure_string(provided_password) == user_password:
_LOG.info(f'[auth] User {username} is authenticated by password')
return True
_LOG.info(f'[auth] User {username} is NOT authenticated by password')
return False
def permissions_handler_instance():
return PermissionsService(
user_service=SERVICE_PROVIDER.user_service,
group_service=SERVICE_PROVIDER.group_service,
policy_service=SERVICE_PROVIDER.policy_service
)