modular_api/web_service/response_processor.py (97 lines of code) (raw):
import urllib
from modular_api.helpers.exceptions import ModularApiInternalException, \
ModularApiBadRequestException, ModularApiUnauthorizedException
from pynamodb.exceptions import GetError
from modular_api.helpers.log_helper import get_logger
M3MODULAR_ERROR_TYPE_KEY = 'error_type'
M3MODULAR_ERROR_MESSAGE_KEY = 'message'
_LOG = get_logger(__name__)
def build_exception_content(exception):
if isinstance(exception, GetError) and getattr(
exception, 'cause_response_code', '') == 'ExpiredTokenException':
exception = ModularApiUnauthorizedException(
'Token expired.')
if not hasattr(exception, 'code'):
exception = ModularApiInternalException('Exception occurred')
code = exception.code
error_type = exception.__class__.__name__
error_message = str(exception)
content = {
M3MODULAR_ERROR_TYPE_KEY: error_type,
M3MODULAR_ERROR_MESSAGE_KEY: error_message
}
return code, content
def __check_user_allowed_values(user_meta, requested_params):
requested_params_names = requested_params.keys()
for parameter in requested_params_names:
if parameter in user_meta.keys():
allow_list = [name.lower() for name in user_meta[parameter]
if isinstance(name, str)]
user_value = requested_params[parameter].lower() \
if isinstance(requested_params[parameter], str) \
else requested_params[parameter]
if user_value not in allow_list:
invalid_requested_parameter_message = \
f'Invalid request for your user. Allowed value(s) ' \
f'for \'{parameter}\': {user_meta[parameter]}'
_LOG.error(invalid_requested_parameter_message)
raise ModularApiBadRequestException(
invalid_requested_parameter_message
)
def validate_request(command, req_params, method, user_meta):
if command['route']['method'] != method:
raise ModularApiBadRequestException(
f'The command {command["route"]["method"]} '
f'is not available by method {method}')
command_def_params = command['parameters']
required_params = [param for param in command_def_params if
param['required']]
all_param_names = [param['name'] for param in command_def_params]
if len(all_param_names) == len(req_params) or len(req_params) > len(
all_param_names):
if not set(all_param_names) == set(req_params):
wrong_parameters_specified_message = 'Wrong parameters specified'
_LOG.error(wrong_parameters_specified_message)
raise ModularApiBadRequestException(
wrong_parameters_specified_message
)
else:
required_params_names = [param['name'] for param in
required_params]
not_provided_params = set(required_params_names).difference(req_params)
if not_provided_params:
missed_params_error_message = \
f'Not all required parameters specified: ' \
f'{", ".join(not_provided_params)}'
_LOG.error(missed_params_error_message)
raise ModularApiBadRequestException(missed_params_error_message)
__check_user_allowed_values(
user_meta=user_meta,
requested_params=req_params
)
return req_params
def extract_and_convert_parameters(request, command_def):
result = {}
if request.method == 'GET':
query_string = request.query_string
pairs = query_string.split('&')
type_map = {item['name']: item['type']
for item in command_def['parameters']}
for pair in pairs:
split = pair.split('=')
if len(split) == 2:
param_name, param_value = split
if '+' in param_value:
param_value = param_value.replace('+', ' ')
value = urllib.parse.unquote(param_value)
param = urllib.parse.unquote(param_name)
result[param] = value
else:
result = {} if not request.json else request.json
return result
def get_group_path(mount_point, group_name):
if mount_point == group_name:
return group_name
elif mount_point == '/':
return '/' + group_name
return '/'.join([mount_point, group_name])