modular_api/commands_generator.py (412 lines of code) (raw):
import copy
import importlib
import inspect
import os
import re
import sys
from datetime import datetime, timedelta
from unittest.mock import patch
from click import Group
from click.types import Choice, IntRange, FloatRange
from modular_api.helpers.log_helper import get_logger
ALLOWED_EXTENSIONS_PATTERN = r"(?<=allowed_extensions=\[)['., \w]+(?=])"
GROUP_NAME_SEPARATOR = '_'
DEFAULT_METHOD = 'POST'
_LOG = get_logger(__name__)
FILE_CHECKS_CALLBACKS = ['callback=check_path_exists_required',
'callback=check_path_exists_optional',
'callback=check_file_exists_required',
'callback=validate_file_required',
'callback=validate_file_optional',
'callback=create_file_if_it_not_exists']
REQUIRED_PARAM_CALLBACKS = ['required=True',
'check_required_param',
'check_required_param_and_to_lower',
'verify_required_email',
'verify_required_email_and_to_lower',
'check_required_list_and_convert_in_upper_case',
'check_required_param_and_convert_in_upper_case',
'check_param_by_regex_required',
'check_path_exists_required',
'check_file_exists_required',
'check_is_digit_required',
'parse_date_and_convert_to_timestamp_required',
'parse_date_required',
'parse_date_yyyy_mm',
'parse_date_yyyy_mm_dd_hh',
'validate_file_required']
DICT_WITH_CREDS_FOR_MOCK = {
'Arn': 'user/asdfasdf',
'Credentials': {
'SecretAccessKey': 'mock',
'AccessKeyId': 'mock',
'SessionToken': 'mock',
'Expiration': datetime.utcnow() + timedelta(seconds=10)
}
}
def resolve_group_name(group_file):
file_name_wo_ext = group_file.split('.')[0]
group_full_name_list = __resolve_group_name(
file_name_wo_ext)
group_name = group_full_name_list[-1] if type(
group_full_name_list) == list else group_full_name_list
return group_full_name_list, group_name
def get_file_names_which_contains_admin_commands(path_to_scan):
listdir = [filename for filename in os.listdir(path_to_scan)
if filename.endswith('.py') and not filename.startswith('_')]
return listdir
def _resolve_root_group_name(file_content):
root_group_name = None
for idx, line in enumerate(file_content):
if 'project.scripts' in line or 'console_scripts' in line:
root_file_path = file_content[idx + 1]
root_group_name = re.findall(r"(?<=\.)[^.:\s]+(?=:)", root_file_path)
if not root_group_name:
raise AssertionError('Can not resolve root group name')
return root_group_name[0]
def generate_valid_commands(destination_folder,
path_to_setup_file_in_module,
path_to_scan=None,
mount_point='',
is_private_mode_enabled=False):
# generate or compute the path to process
_LOG.info(f'[commands] Path to scan: {path_to_scan}')
# iterate files
valid_methods = {'type': 'module', 'body': {}}
listdir = get_file_names_which_contains_admin_commands(
path_to_scan=path_to_scan)
if destination_folder not in sys.path:
sys.path.append(destination_folder)
with open(path_to_setup_file_in_module) as file:
file_content = file.readlines()
root_group_name = _resolve_root_group_name(file_content=file_content)
for group_file in sorted(listdir):
group_full_name_list, group_name = resolve_group_name(
group_file=group_file)
is_private_group = (isinstance(group_full_name_list, list) and
group_full_name_list[0] == 'private' or
group_full_name_list == 'private')
is_subgroup = (isinstance(group_full_name_list, list) and
not is_private_group)
is_root_group = root_group_name == group_name
if is_private_group ^ is_private_mode_enabled:
continue
# from index.py -> get_module_group_and_associate_object
module_spec = importlib.util.spec_from_file_location(
group_name,
os.path.join(path_to_scan, group_file))
imported_module = importlib.util.module_from_spec(module_spec)
with patch('botocore.client.BaseClient._make_api_call',
return_value=DICT_WITH_CREDS_FOR_MOCK):
module_spec.loader.exec_module(imported_module)
commands = CommandsDefinitionsExtractor(group_name,
imported_module,
mount_point).extract(subgroup=is_subgroup)
group_meta = {"type": "group", 'body': commands}
if is_subgroup:
if not valid_methods['body'].get(group_full_name_list[0]):
valid_methods['body'][group_full_name_list[0]] = {'body': {}}
if isinstance(group_full_name_list, list) \
and len(group_full_name_list) > 2:
if not valid_methods['body'][group_full_name_list[0]]['body'].get(group_full_name_list[1]):
valid_methods['body'][group_full_name_list[0]]['body'][group_full_name_list[1]] = {'body': {}}
# todo refactor this
valid_methods['body'][group_full_name_list[0]]['body'][group_full_name_list[1]]['body'].update(
{group_name: group_meta})
else:
valid_methods['body'][group_full_name_list[0]]['body'].update(
{group_name: group_meta})
elif is_root_group:
root_commands_meta = group_meta.pop('body')
for root_command_meta in root_commands_meta.values():
root_command_meta['type'] = 'root command'
valid_methods['body'].update(root_commands_meta)
else:
valid_methods['body'][group_name] = group_meta
if destination_folder in sys.path:
sys.path.remove(destination_folder)
return valid_methods
def __resolve_group_name(group_filename):
if GROUP_NAME_SEPARATOR not in group_filename:
return group_filename
return group_filename.split(GROUP_NAME_SEPARATOR)
def _get_param_def_from_line(line):
if '\'\'' in line:
line = line.replace('\'\'', '')
split = [line for line in line.split('\'') if line]
param_name = None
param_doc = None
alias_name = None
param_type = 'str'
is_flag = False
is_path_to_file = False
file_extension = None
for index, part in enumerate(split):
# todo this all does not seem right...
if any(i in part for i in FILE_CHECKS_CALLBACKS):
is_path_to_file = True
match = re.search(ALLOWED_EXTENSIONS_PATTERN, line)
if match:
allowed_extensions = match.group().split(', ')
file_extension = [extension.strip("\"'")
for extension in allowed_extensions]
if re.match(r'^--[a-z]', part):
param_name = str(part).replace('--', '')
if re.match(r'^-[a-zA-z]', part):
alias_name = str(part).replace('-', '')
if 'help=' in part:
param_doc = str(split[index + 1])
param_doc = param_doc.replace('*', '').strip() \
if '*' in param_doc \
else param_doc
if 'is_flag' in part:
param_type = 'bool'
is_flag = True
if 'type' in part:
click_type = part.split('type=', 1)[-1].split(',')[0]
if 'Choice' in click_type:
click_type = 'enum'
if 'IntRange' in click_type or 'float' in click_type:
click_type = 'num'
if click_type not in ['list', 'str', 'bool', 'enum', 'num']:
click_type = 'str'
param_type = click_type
if 'multiple' in part:
param_type = 'list'
param_required = any(i in line for i in REQUIRED_PARAM_CALLBACKS)
response = {
'name': param_name,
'alias': alias_name,
'required': param_required,
'description': param_doc,
'type': param_type,
'is_flag': is_flag
}
if is_path_to_file:
response['convert_content_to_file'] = is_path_to_file
if file_extension:
response['temp_file_extension'] = file_extension
return response
class CommandsDefinitionsExtractor:
DEFAULT_METHOD = 'POST'
DEFAULT_MOUNT_POINT = '/'
click_to_our_types_mapping = {
'choice': 'enum',
'boolean': 'bool',
'text': 'str',
'integer': 'str'
}
def __init__(self, group_name, module, mount_point=DEFAULT_MOUNT_POINT):
self._group_name = group_name
self._module = module
self._mount_point = mount_point
self._Command = getattr(
importlib.import_module('click.core'), 'Command'
)
@staticmethod
def _get_alias(opts):
"""Get alias from click's opts. They look like ['--param', '-p'].
Trere are no guarantee that alias will be second."""
if len(opts) == 1:
return None
for opt in opts:
if opt[1] != '-':
return opt[1:]
@staticmethod
def _merge_route_configs(primary, secondary):
for node in secondary:
if primary.get(node):
secondary[node] = primary.get(node)
return secondary
def _get_default_route_config(self, group_full_name, command_name, subgroup):
full_group_path = '/'.join(group_full_name) \
if isinstance(group_full_name, list) else group_full_name
if subgroup:
module_path = str(self._module)
resolved_parents = re.search('\w*.(?=.py)', module_path).group(0)
path_components = resolved_parents.split('_')
subgroup_path = '/'.join(component for component in path_components)
path = f'/{subgroup_path}/{command_name}' \
if self._mount_point == '/' \
else f'{self._mount_point}/{subgroup_path}/{command_name}'
else:
path = f'/{full_group_path}/{command_name}' \
if self._mount_point == '/' \
else f'{self._mount_point}/{full_group_path}/{command_name}'
return {
'method': self.DEFAULT_METHOD,
'path': path,
'group_path': path
}
@staticmethod
def _get_route_configuration_from_line(line, default):
default_route = {'method': 'GET'}
if '=' not in line:
return default_route
line = line.replace('\'', '')
start_index = line.index('(')
end_index = line.index(')')
line = line[start_index + 1: end_index] #
parameters = line.split(',')
for parameter_def in parameters:
split = parameter_def.split('=')
name = split[0].strip()
value = split[1].strip()
default_route[name] = value
return default_route
@staticmethod
def _get_parameters_to_be_secured(line):
parameters_to_be_secured = []
line = line.replace('\'', '')
start_index = line.index('[')
end_index = line.index(']')
line = line[start_index + 1: end_index]
parameters = line.split(',')
for parameter in parameters:
parameters_to_be_secured.append(parameter.strip())
return parameters_to_be_secured
def _get_api_route_flag_and_secured_params(self, subgroup):
group_content = inspect.getsource(self._module)
lines = group_content.split('\n')
command_definitions = {}
for index, line in enumerate(lines):
if line.startswith('#'):
continue
if '@{}.command'.format(self._group_name) in line:
# find from @{group}.command to enclosing """ of docstring
command_lines = [line]
comment_close_sum_counter = 0
counter = 1
# definition of command function
command_def_line_passed = False
while comment_close_sum_counter != 2:
current_line = lines[index + counter]
if 'def ' in current_line:
command_def_line_passed = True
if command_def_line_passed and '):' in current_line and \
'\"\"\"' not in lines[index + counter + 1]:
comment_close_sum_counter = 2 # to break the loop
if '\"\"\"' in current_line:
comment_close_sum_counter += 1
counter += 1
continue
command_lines.append(current_line.strip())
counter += 1
# prepare for analysis
prepared_lines = []
for i, line in enumerate(command_lines):
if not str(line).startswith('@') and not \
str(line).startswith('def'):
prepared_lines[-1] = prepared_lines[-1] + line
else:
prepared_lines.append(line)
# analyze lines
name = None
is_command_hidden = False
route_config = {}
secure_parameters = []
files_parameters = {}
security_parameters_found = False
secured_parameters_string = ''
flag_parameters = []
for line in prepared_lines:
if f'@{self._group_name}.command' in line:
name = line.split('\'')[1]
if '@api_route' in line:
route_config = self._get_route_configuration_from_line(
line=line, default=route_config)
if '@click.option' in line:
response = _get_param_def_from_line(line)
param_name = response.get('name')
if response.get('is_flag'):
flag_parameters.append(param_name)
convert_content_to_file = response.get('convert_content_to_file')
temp_file_extension = response.get('temp_file_extension')
if any([convert_content_to_file, temp_file_extension]):
files_parameters.update(
{
param_name: {
'convert_content_to_file': convert_content_to_file,
'temp_file_extension': temp_file_extension
}
}
)
if '@shadow_wrapper' in line:
is_command_hidden = True
if 'secured_params=' in line \
and not security_parameters_found:
if ']' in line:
security_parameters_found = True
secured_parameters_string += line
secure_parameters = self._get_parameters_to_be_secured(
secured_parameters_string)
secured_parameters_string += line
default_rt = self._get_default_route_config(
group_full_name=self._group_name,
command_name=name,
subgroup=subgroup
)
route_config = self._merge_route_configs(primary=route_config,
secondary=default_rt)
command_definitions.update({
name: {
'route': route_config,
'secure_parameters': secure_parameters,
'files_parameters': files_parameters,
'is_command_hidden': is_command_hidden,
'flag_parameters': flag_parameters
}
})
return command_definitions
def extract(self, subgroup):
definitions = {}
for entity in dir(self._module):
click_command = getattr(self._module, entity)
if not isinstance(click_command, self._Command) or \
click_command.name == self._group_name or \
isinstance(click_command, Group):
continue
parameters = []
for param in click_command.params:
required = (param.callback and param.callback.__name__
in REQUIRED_PARAM_CALLBACKS) or param.required
param_help = param.help.replace('* ', '') if param.help else ''
param_meta = {
'name': param.human_readable_name,
'alias': self._get_alias(param.opts),
'required': required,
'description': param_help,
'type': self.click_to_our_types_mapping.get(
param.type.name, param.type.name)
}
if isinstance(param.type, (Choice)):
# TODO refactor asap
choices = param.type.choices
param_meta['description'] += f' {"|".join(choices)}'
if isinstance(param.type, (IntRange, FloatRange)):
# TODO update click and use get_metavar
param_meta['description'] += f' {param.type.min or ""}<=x<={param.type.max or ""}'
parameters.append(param_meta)
definitions.update({
click_command.name: {
'body': {
'description': click_command.help,
'parameters': parameters,
'handler': entity,
'parent': self._group_name
}
}
})
routes_and_secured_params = (
self._get_api_route_flag_and_secured_params(subgroup))
for name, body in definitions.items():
command_config = routes_and_secured_params.get(name, {})
files_parameters = command_config.pop('files_parameters', None)
if files_parameters:
command_params = body['body']['parameters']
for param in command_params:
param_name = param.get('name')
if param_name in files_parameters:
param.update(files_parameters[param_name])
body['body'].update(routes_and_secured_params.get(name, {}))
definitions_copy = copy.deepcopy(definitions)
for name, body in definitions.items():
command_config = routes_and_secured_params.get(name, {})
flag_parameters = command_config.get('flag_parameters')
if flag_parameters:
for idx, param in enumerate(body['body']['parameters']):
if param['name'] in flag_parameters:
definitions_copy[name]['body']['parameters'][idx][
'is_flag'] = True
definitions_copy[name]['body'].pop('flag_parameters', None)
is_command_hidden = command_config.pop('is_command_hidden', None)
if is_command_hidden:
definitions_copy.pop(name)
return definitions_copy