modular_api/helpers/decorators.py (244 lines of code) (raw):
import json
import os
import sys
from copy import deepcopy
from functools import wraps
import click
from prettytable import PrettyTable
from modular_api.helpers.constants import CLI_VIEW, TABLE_VIEW, \
MODULAR_API_RESPONSE, MAX_COLUMNS_WIDTH, JSON_VIEW
from modular_api.helpers.date_utils import utc_time_now
from modular_api.helpers.exceptions import ModularApiBaseException, \
ModularApiBadRequestException
from modular_api.helpers.log_helper import get_logger, API_LOGS_FILE
from modular_api.services import SERVICE_PROVIDER
_LOG = get_logger(__name__)
new_line = os.linesep
class BaseCommand(click.core.Command):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.params.insert(
len(self.params),
click.core.Option(
('--table',), is_flag=True,
help='Use this parameter to show command`s response in a Table '
'view'
)
)
self.params.insert(
len(self.params),
click.core.Option(
('--json',), is_flag=True,
help='Use this parameter to show command`s response in a JSON '
'view'
)
)
def main(self, *args, **kwargs):
try:
return super().main(*args, **kwargs)
except Exception as e:
raise ModularApiBaseException(str(e))
class ResponseDecorator:
"""
Wrapper for formatting cli command response
:param stdout: function which prints response to the end user
:param error_message: message that will be displayed in case command
failed to execute
:return:
"""
def __init__(self, stdout, error_message: str, custom_view: bool = False):
self.stdout = stdout # it's not stdout, it is a print function
self.error_message = error_message
self.custom_view = custom_view
def __call__(self, fn):
@wraps(fn)
def decorated(*args, **kwargs):
view_format = CLI_VIEW if self.custom_view else \
resolve_output_format(kwargs=kwargs)
try:
resp = fn(*args, **kwargs)
except ModularApiBaseException as context:
_LOG.info('ModularApiBaseException occurred')
resp = CommandResponse(message=str(context), error=True)
except Exception:
_LOG.exception('Unexpected exception occurred')
message = f'Unexpected exception occurs.{os.linesep}' \
f'See detailed info and traceback in ' \
f'{API_LOGS_FILE}'
resp = CommandResponse(message=message, error=True)
func_result = ResponseFormatter(function_result=resp,
view_format=view_format)
response = self.stdout(func_result.prettify_response())
if resp.error:
sys.exit(1)
return response
return decorated
def resolve_output_format(kwargs):
view_format = CLI_VIEW
table_format = kwargs.pop(TABLE_VIEW, False)
json_format = kwargs.pop(JSON_VIEW, False)
if table_format and json_format:
raise ModularApiBadRequestException(
'Please specify only one parameter - table or json'
)
if table_format:
view_format = TABLE_VIEW
if json_format:
view_format = JSON_VIEW
return view_format
class ExceptionDecorator:
"""
Wrapper for formatting only error response
:param stdout: function which prints response to the end user
:param error_message: message that will be displayed in case command
failed to execute
:return:
"""
def __init__(self, stdout, error_message):
self.stdout = stdout
self.error_message = error_message
def __call__(self, fn):
@wraps(fn)
def decorated(*args, **kwargs):
view_format = resolve_output_format(kwargs=kwargs)
try:
return fn(*args, **kwargs)
except ModularApiBaseException as context:
_LOG.info('ModularApiBaseException occurred')
resp = CommandResponse(message=str(context), error=True)
except Exception:
_LOG.exception('Unexpected exception occurred')
message = f'Unexpected exception occurs.{os.linesep}' \
f'See detailed info and traceback in ' \
f'{API_LOGS_FILE}'
resp = CommandResponse(message=message, error=True)
func_result = ResponseFormatter(function_result=resp,
view_format=view_format)
self.stdout(func_result.prettify_response())
exit()
return decorated
class CommandResponse:
def __init__(self, message: str = None, items: list = None,
warnings=None, table_title=None, error=False):
self.error = error
self.message = message
self.warnings = [] if not warnings else warnings
self.items = items
self.table_title = table_title
if not (self.table_title and self.items) and self.message is None:
self.warnings.append(
'Please provide "table_title", "items" or "message" parameter'
)
class ResponseFormatter:
def __init__(self, function_result, view_format):
self.view_format = view_format
self.function_result = function_result
self.format_to_process_method = {
CLI_VIEW: self.process_cli_view,
TABLE_VIEW: self.process_table_view,
JSON_VIEW: self.process_json_view,
}
@staticmethod
def unpack_success_result_values(response_meta):
warnings = response_meta.warnings
message = response_meta.message
items = response_meta.items
table_title = response_meta.table_title
return warnings, message, items, table_title
def process_cli_view(self, response_meta):
warnings, message, items, table_title = \
self.unpack_success_result_values(response_meta=response_meta)
if table_title and items:
return self.process_table_view(response_meta=response_meta)
if response_meta.error:
return f'Error:{os.linesep}{message}'
return message # probably json, so we don't want to mangle it
def process_table_view(self, response_meta: CommandResponse
) -> PrettyTable:
response = PrettyTable()
warnings, message, items, table_title = \
self.unpack_success_result_values(
response_meta=response_meta)
if message:
response.field_names = [MODULAR_API_RESPONSE]
response._max_width = {MODULAR_API_RESPONSE: 70}
response.add_row([message])
elif table_title and items:
all_values = {}
uniq_table_headers = []
width_table_columns = {}
for each_item in response_meta.items:
if not isinstance(each_item, dict):
each_item = {'Result': each_item}
for table_key, table_value in each_item.items():
if all_values.get(table_key):
all_values[table_key].append(table_value)
else:
all_values[table_key] = [table_value]
uniq_table_headers.extend([
table_key for table_key in each_item.keys()
if table_key not in uniq_table_headers
])
if not width_table_columns.get(table_key) \
or width_table_columns.get(table_key) \
< len(str(table_value)):
width_table_columns[table_key] = len(str(table_value))
import itertools
response.field_names = uniq_table_headers
response._max_width = {
each: MAX_COLUMNS_WIDTH for each in uniq_table_headers
}
last_string_index = 0
# Fills with an empty content absent items attributes to
# align the table
table_rows = itertools.zip_longest(
*[j for i, j in all_values.items()], fillvalue='')
for lst in table_rows:
response.add_row(lst)
row_separator = ['-' * min(
max(width_table_columns[uniq_table_headers[i]],
len(str(uniq_table_headers[i]))),
30) for i in range(len(uniq_table_headers))]
response.add_row(row_separator)
last_string_index += 2
response.del_row(last_string_index - 1)
response = (table_title + os.linesep if table_title else str()
) + str(response)
if response_meta.warnings:
response += _prettify_warnings(response_meta.warnings)
return response
@staticmethod
def process_json_view(response_meta: CommandResponse):
json_view = response_meta.items
if not json_view:
json_view = {
'status': 'error' if response_meta.error else 'success',
'message': response_meta.message,
'warnings': response_meta.warnings,
'items': response_meta.items,
'table_title': response_meta.table_title,
}
json_view = {
k: v for k, v in json_view.items() if v
}
return json.dumps(json_view, indent=4)
def prettify_response(self):
view_processor = self.format_to_process_method[self.view_format]
prettified_response = view_processor(response_meta=self.function_result)
return prettified_response
def _prettify_warnings(warnings: list):
return f'{os.linesep}WARNINGS:{os.linesep}' + \
f'{os.linesep}'.join([str(i + 1) + '. ' + warnings[i]
for i in range(len(warnings))])
def get_command_info(func):
group = func.__module__.split(".")[-1]
command = func.__name__
return group, command
def produce_audit(secured_params=None):
"""
Creates audit event and publishes after successful execution.
:param secured_params: names of secured parameters - they won't be
included to audit event.
:return:
"""
def real_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
group, command = get_command_info(func=func)
parameters = deepcopy(kwargs)
for param, value in kwargs.items():
if secured_params and param in secured_params:
parameters[param] = '*****'
try:
func_result = func(*args, **kwargs)
except ModularApiBaseException as e:
raise e
SERVICE_PROVIDER.audit_service.save_audit(
group=group,
command=command,
timestamp=utc_time_now().isoformat(),
parameters=json.dumps(parameters),
result=func_result.message if func_result.message else str(func_result.items),
warnings=func_result.warnings
)
return func_result
return wrapper
return real_wrapper