modular_cli/service/decorators.py (395 lines of code) (raw):

import itertools import json import os import shutil import sys from functools import wraps import click import yaml from prettytable import PrettyTable from modular_cli import ENTRY_POINT from modular_cli.service.config import ( add_data_to_config, ROOT_ADMIN_VERSION, ) from modular_cli.service.help_client import ( SetupCommandHandler, LoginCommandHandler, CleanupCommandHandler, EnableAutocompleteCommandHandler, DisableAutocompleteCommandHandler, VersionCommandHandler, ) from modular_cli.service.initializer import init_configuration from modular_cli.utils.exceptions import ( ModularCliBadRequestException, ModularCliBaseException, HTTP_CODE_EXCEPTION_MAPPING, ) from modular_cli.utils.logger import get_logger _LOG = get_logger(__name__) def init_config(func): @wraps(func) def wrapper(*args, **kwargs): init_configuration() return func(*args, **kwargs) return wrapper JSON_VIEW = 'json' TABLE_VIEW = 'table' HELP_COMMAND = 'help' COMMAND_KEY = 'command' CONFIGURATION_COMMAND = 'configuration_command' CLI_VIEW = 'cli' ERROR_STATUS = 'FAILED' FILE_NAME = 'modular-cli.log' SUCCESS_STATUS = 'SUCCESS' MODULAR_CLI_STATUS = 'status' MODULAR_CLI_CODE = 'code' MODULAR_CLI_ERROR_TYPE = 'error_type' MODULAR_CLI_MESSAGE = 'message' MODULAR_CLI_META = 'meta' MODULAR_CLI_TABLE_TITLE = 'table_title' MODULAR_CLI_ITEMS = 'items' MODULAR_CLI_WARNINGS = 'warnings' MODULAR_CLI_RESPONSE = 'response' MAX_COLUMNS_WIDTH = 46 FALLBACK_SIZE = (100, 30) # Default to 100 columns and 30 lines POSITIVE_ANSWERS = ['y', 'yes'] CONFIRMATION_MESSAGE = 'The command`s response is pretty huge and the ' \ 'result table structure can be broken.\nDo you want ' \ 'to show the response in the JSON format? [y/n]: ' SETUP_COMMAND_HELP = \ f'Usage: {ENTRY_POINT} setup [parameters]{os.linesep}' \ f'Parameters:{os.linesep} --username, Access key ' \ f'associated with the Maestro user{os.linesep} --password, Secret ' \ f'key associated with the Maestro user{os.linesep} --api_path, ' \ f'Address of the Maestro environment.' LOGIN_COMMAND_HELP = f'{os.linesep}Usage: {ENTRY_POINT} login{os.linesep}' \ f'{os.linesep}Returns JWT token and commands meta in ' \ f'accordance with the user\'s permissions' CLEANUP_COMMAND_HELP = f'{os.linesep}Usage: {ENTRY_POINT} cleanup [parameters]' \ f'{os.linesep}{os.linesep}Removes all the ' \ f'configuration data related to the tool.' class TextColors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' @classmethod def _wrap(cls, color: str, string: str) -> str: return color + string + cls.ENDC @classmethod def yellow(cls, string: str) -> str: return cls._wrap(cls.WARNING, string) @classmethod def green(cls, string: str) -> str: return cls._wrap(cls.OKGREEN, string) @classmethod def red(cls, string: str) -> str: return cls._wrap(cls.FAIL, string) def check_and_extract_received_params(arguments, required_params): result = [] missing = [] for arg, required in required_params.items(): if arg in arguments: result.append(arguments[arguments.index(arg) + 1]) else: if required: missing.append(arg.replace('--', '')) if missing: raise ModularCliBadRequestException( f'The following parameters are missing: {", ".join(missing)}') return result def dynamic_dispatcher(func): @wraps(func) def wrapper(ctx, *args, **kwargs): try: if not ctx.args: params = {HELP_COMMAND: True} return func(**params) view_type = CLI_VIEW if ctx.params.get(TABLE_VIEW): view_type = TABLE_VIEW if ctx.params.get(JSON_VIEW): view_type = JSON_VIEW if ctx.args[0] in ['setup', 'login', 'cleanup', 'version', 'enable_autocomplete', 'disable_autocomplete']: response = configuration_executor( config_command=ctx.args[0], config_command_help=ctx.params['help'], config_params=ctx.args, ) return response params_indexes = [ ctx.args.index(arg) for arg in ctx.args if arg.startswith('-') ] if params_indexes: command_end_index, *_ = params_indexes requested_command = ctx.args[:command_end_index] parameters = ctx.args[command_end_index:] response = func(help=kwargs.get('help'), command=requested_command, parameters=parameters, view_type=view_type) else: requested_command = ctx.args parameters = {} response = func(help=kwargs.get('help'), command=requested_command, parameters=parameters, view_type=view_type) return response except ModularCliBaseException as e: response = CommandResponse(message=str(e), code=e.code) return response return wrapper def process_meta(server_meta): new_meta = {} for mount_point, meta in server_meta.items(): bare_module_name = mount_point.replace('/', '') if not bare_module_name: # in case of / mount point add_data_to_config(name=ROOT_ADMIN_VERSION, value=meta['version']) new_meta.update(meta.get('body')) else: new_meta.update({bare_module_name: meta}) return new_meta CONFIG_COMMAND_HANDLER_MAPPING = { 'setup': SetupCommandHandler, 'login': LoginCommandHandler, 'cleanup': CleanupCommandHandler, 'enable_autocomplete': EnableAutocompleteCommandHandler, 'disable_autocomplete': DisableAutocompleteCommandHandler, 'version': VersionCommandHandler } def configuration_executor(config_command, config_command_help, config_params): config_command_class = CONFIG_COMMAND_HANDLER_MAPPING[config_command] initiate_appropriate_command = config_command_class( config_command_help=config_command_help, config_params=config_params) return initiate_appropriate_command.process_passed_command() def cli_response(): def internal(func): @wraps(func) def wrapper(*args, **kwargs): view_format = JSON_VIEW response = func(*args, **kwargs) pretty_response = ResponseFormatter( function_result=response, view_format=view_format, ).prettify_response() click.echo(pretty_response) return wrapper return internal class ResponseDecorator: """ Wrapper for formatting cli command response :param stdout: function which prints response to the end user :param error_message: message that will be displayed in case command failed to execute :param secured_params: value of this parameters will be replaced with asterisks. :return: """ view_format = 'TEST' def __init__(self, stdout, error_message, secured_params=None): self.stdout = stdout self.error_message = error_message self.secured_params = secured_params def __call__(self, fn): @wraps(fn) def decorated(*args, **kwargs): _FUNC_LOG = _LOG.getChild(fn.__name__) # todo remove ? view_format = CLI_VIEW table_format = kwargs.pop(TABLE_VIEW, False) json_format = kwargs.pop(JSON_VIEW, False) if table_format: view_format = TABLE_VIEW elif json_format: view_format = JSON_VIEW global VIEW_FORMAT # there is no global VIEW_FORMAT = view_format resp = fn(*args, **kwargs) # CommandResponse if not isinstance(resp, CommandResponse): warn_message = ['Response is broken and does not match ' 'CommandResponse object'] # todo this looks like developer's error. Exception here _FUNC_LOG.warning(warn_message) resp = CommandResponse(message=resp, warnings=warn_message, code=400) func_result = ResponseFormatter(function_result=resp, view_format=view_format) response = self.stdout(func_result.prettify_response()) if not ResponseFormatter.is_response_success(resp): sys.exit(1) return response return decorated class CommandResponse: def __init__( self, code=200, message: str | None = None, items: list | None = None, warnings=None, table_title=None, **kwargs ): """ Considering kwargs to be extra attributes that are specific to each module """ self.code = code self.message = message self.warnings = warnings or [] self.items = items self.table_title = table_title # Remove status from meta self.status = kwargs.pop('status', None) or kwargs.pop('Status', None) # modular-api provides status of operation which can always be # determined by status code. Here this self.status not used self.meta = dict(kwargs) if not (self.table_title and self.items) and self.message is None: self.warnings.append( 'Please provide "table_title" and "items" or "message" ' 'parameter') class ResponseFormatter: def __init__(self, function_result, view_format): self.view_format = view_format self.function_result = function_result self.format_to_process_method = { CLI_VIEW: self.process_cli_view, JSON_VIEW: self.process_json_view, TABLE_VIEW: self.process_table_view } @staticmethod def _prettify_warnings(warnings: list): return f'{os.linesep}WARNINGS:{os.linesep}' + \ f'{os.linesep}'.join(str(i + 1) + '. ' + warnings[i] for i in range(len(warnings))) @staticmethod def is_response_success(response_meta: CommandResponse): return 200 <= response_meta.code < 400 @staticmethod def unpack_success_result_values(response_meta: CommandResponse): success_code = response_meta.code warnings = response_meta.warnings message = response_meta.message items = response_meta.items table_title = response_meta.table_title return success_code, warnings, message, items, table_title @staticmethod def unpack_error_result_values(response_meta: CommandResponse): error_code = response_meta.code error_type = HTTP_CODE_EXCEPTION_MAPPING[error_code].__name__ message = response_meta.message return error_type, error_code, message def process_cli_view(self, status: str, response_meta: CommandResponse): if status == ERROR_STATUS: error_type, error_code, message = self.unpack_error_result_values( response_meta=response_meta) return f'Error:{os.linesep}{message}' elif status == SUCCESS_STATUS: success_code, warnings, message, items, table_title = \ self.unpack_success_result_values(response_meta=response_meta) if table_title and items: return self.process_table_view(status=status, response_meta=response_meta) result_message = f'Response:{os.linesep}{message}' if warnings: result_message += self._prettify_warnings(warnings) return result_message def process_json_view(self, status: str, response_meta: CommandResponse): if status == ERROR_STATUS: error_type, error_code, message = self.unpack_error_result_values( response_meta=response_meta) return json.dumps({ MODULAR_CLI_STATUS: status, MODULAR_CLI_CODE: error_code, MODULAR_CLI_ERROR_TYPE: error_type, MODULAR_CLI_MESSAGE: message, MODULAR_CLI_META: response_meta.meta }, indent=4) elif status == SUCCESS_STATUS: success_code, warnings, message, items, table_title = \ self.unpack_success_result_values(response_meta=response_meta) if table_title and items: return json.dumps({ MODULAR_CLI_STATUS: status, MODULAR_CLI_CODE: success_code, MODULAR_CLI_TABLE_TITLE: table_title, MODULAR_CLI_ITEMS: items, MODULAR_CLI_WARNINGS: warnings, MODULAR_CLI_META: response_meta.meta }, indent=4) return json.dumps({ MODULAR_CLI_STATUS: status, MODULAR_CLI_CODE: success_code, MODULAR_CLI_MESSAGE: message, MODULAR_CLI_WARNINGS: warnings, MODULAR_CLI_META: response_meta.meta }, indent=4) def process_table_view( self, status: str, response_meta: CommandResponse, ) -> PrettyTable: response = PrettyTable() if status == ERROR_STATUS: response.field_names = [MODULAR_CLI_STATUS, MODULAR_CLI_CODE, MODULAR_CLI_MESSAGE] response._max_width = {MODULAR_CLI_STATUS: 10, MODULAR_CLI_CODE: 5, MODULAR_CLI_MESSAGE: 70} error_type, error_code, message = self.unpack_error_result_values( response_meta=response_meta) response.add_row([status, error_code, message]) response = response.__str__() return response elif status == SUCCESS_STATUS: success_code, warnings, message, items, table_title = \ self.unpack_success_result_values( response_meta=response_meta) if message: response.field_names = [MODULAR_CLI_STATUS, MODULAR_CLI_CODE, MODULAR_CLI_RESPONSE] response._max_width = {MODULAR_CLI_STATUS: 10, MODULAR_CLI_CODE: 5, MODULAR_CLI_RESPONSE: 70} response.add_row([status, success_code, message]) elif table_title and items: for i in range(len(items)): if isinstance(items[i], dict): for key, value in items[i].items(): if isinstance(value, str): items[i][key] = value.replace('\r\n', '\n') elif isinstance(items[i], str): items[i] = items[i].replace('\r\n', '\n') # Step 0: Resolve non-dict elements items = [ {'Result': item} if not isinstance(item, dict) else item for item in items ] # Step 1: Allocate headers uniq_table_headers = list( dict.fromkeys(key for item in items for key in item.keys()) ) # Step 2: Fill all_values (by allocated headers, not by keys of item) all_values = { header: [item.get(header, "") for item in items] for header in uniq_table_headers } response.field_names = uniq_table_headers response.max_width = MAX_COLUMNS_WIDTH # Fills with an empty content absent items attributes to # align the table table_rows = itertools.zip_longest( *[j for i, j in all_values.items()], fillvalue='') for lst in table_rows: response.add_row(lst, divider=True) try: required_width = str(response).index('\n') terminal_columns = \ shutil.get_terminal_size(fallback=FALLBACK_SIZE).columns if required_width > terminal_columns: user_input = input(CONFIRMATION_MESSAGE).lower().strip() if user_input in POSITIVE_ANSWERS: return self.process_json_view(status, response_meta) except Exception: # noqa pass # ----- showing meta in table view ----- if response_meta.meta: meta = yaml.dump({ self.format_title(k): v for k, v in response_meta.meta.items() }) response = meta + str(response) # ----- showing meta in table view ----- meta_status = self.format_title('status') + ': ' + TextColors.green(status) if status != ERROR_STATUS else TextColors.red(status) response = (table_title + os.linesep if table_title else str()) + meta_status + os.linesep + str(response) if response_meta.warnings: response += self._prettify_warnings(response_meta.warnings) return response def prettify_response(self): status = SUCCESS_STATUS if self.is_response_success( response_meta=self.function_result) else ERROR_STATUS view_processor = self.format_to_process_method[self.view_format] prettified_response = view_processor( status=status, response_meta=self.function_result) return prettified_response @staticmethod def format_title(title: str) -> str: """ Human-readable """ return title.replace('_', ' ').capitalize()