modular_api/index.py (576 lines of code) (raw):

import importlib.util import json import os import sys from http import HTTPStatus from importlib.metadata import version as lib_version from pathlib import Path from unittest.mock import MagicMock from dotenv import load_dotenv import bottle import click.exceptions from bottle import request, Bottle, response from ddtrace import tracer from limits.storage import MongoDBStorage from typing import Callable from limits import RateLimitItemPerSecond, RateLimitItem from limits.storage import MemoryStorage from limits.strategies import MovingWindowRateLimiter, RateLimiter from modular_sdk.modular import Modular from modular_api.commands_generator import ( resolve_group_name, get_file_names_which_contains_admin_commands ) from modular_api.helpers.compatibility_check import check_version_compatibility from modular_api.helpers.constants import ( MODULES_PATH, MODULE_NAME_KEY, EVENT_TYPE, META, AUX_DATA, PRODUCT, JOB_ID, STATUS, HTTPMethod, ServiceMode, COMMANDS_BASE_FILE_NAME, MODULAR_API_USERNAME, API_MODULE_FILE, MOUNT_POINT_KEY, SWAGGER_HTML ) from modular_api.helpers.exceptions import ( ModularApiUnauthorizedException, ModularApiBadRequestException ) from modular_api.helpers.jwt_auth import ( encode_data_to_jwt, username_from_jwt_token, decode_jwt_token, gen_refresh_token_version, encode_data_to_refresh_jwt, validate_refresh_token, ) from modular_api.helpers.log_helper import get_logger from modular_api.helpers.params_converter import convert_api_params from modular_api.helpers.request_processor import generate_route_meta_mapping from modular_api.helpers.response_processor import process_response from modular_api.helpers.response_utils import get_trace_id, build_response from modular_api.helpers.utilities import token_from_auth_header from modular_api.services import SP from modular_api.services.environment_service import EnvironmentService from modular_api.services.permissions_cache_service import ( permissions_handler_instance ) from modular_api.swagger.generate_open_api_spec import generate_definition from modular_api.version import __version__ from modular_api.web_service import META_VERSION_ID from modular_api.web_service.config import Config from modular_api.web_service.response_processor import ( build_exception_content, validate_request, extract_and_convert_parameters, get_group_path ) from modular_api.services.refresh_token_service import RefreshTokenService _LOG = get_logger(__name__) MODULE_GROUP_GROUP_OBJECT_MAPPING = {} # name to imported module CONFIG = Config() # currently keeps only commands_base.json tracer.configure(writer=MagicMock()) # ??? WEB_SERVICE_PATH = os.path.dirname(__file__) PERMISSION_SERVICE = permissions_handler_instance() USAGE_SERVICE = SP.usage_service THREAD_LOCAL_STORAGE = Modular().thread_local_storage_service() def resolve_permissions(tracer, empty_cache=False): def decorator(func): def wrapper(*a, **ka): # sleep(0.35) # for what? user, password = request.auth or (None, None) token = None if not password: # not basic auth -> probably bearer header = request.headers.get('Authorization') token = token_from_auth_header(header) if header else None try: allowed_commands, user_meta = \ PERMISSION_SERVICE.authenticate_user( username=user, password=password, token=token, empty_cache=empty_cache ) ka['allowed_commands'] = allowed_commands ka['user_meta'] = user_meta return func(*a, **ka) except Exception as e: _LOG.exception('Exception occurred resolving permissions') _trace_id = get_trace_id(tracer=tracer) # TODO sort out this trace id code, content = build_exception_content(exception=e) error_response = build_response(_trace_id=_trace_id, http_code=code, content=content) return error_response return wrapper return decorator def get_module_group_and_associate_object() -> None: modules_path = Path(__file__).parent.resolve() / MODULES_PATH global MODULE_GROUP_GROUP_OBJECT_MAPPING for module in os.listdir(modules_path): module_api_config = os.path.join(modules_path, module, API_MODULE_FILE) with open(module_api_config) as file: api_config = json.load(file) module_path = os.path.join(modules_path, module) if module_path not in sys.path: sys.path.append(module_path) cli_path = api_config['cli_path'] mount_point = api_config['mount_point'] command_group_path = os.path.join(modules_path, module, *cli_path.split('/')) listdir = get_file_names_which_contains_admin_commands( path_to_scan=command_group_path) for command_group in listdir: group_full_name_list, group_name = resolve_group_name( group_file=command_group) is_private_group = (type(group_full_name_list) == list and group_full_name_list[0] == 'private' or group_full_name_list == 'private') group_full_name = '/'.join(group_full_name_list) \ if type(group_full_name_list) == list else group_full_name_list if is_private_group ^ SP.env.is_private_mode_enabled(): continue group_path = get_group_path(mount_point=mount_point, group_name=group_full_name) module_spec = importlib.util.spec_from_file_location( group_full_name, os.path.join(command_group_path, command_group)) imported_module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(imported_module) MODULE_GROUP_GROUP_OBJECT_MAPPING.update( {group_path: imported_module}) def initialize() -> None: """ Can raise :return: """ # loading configuration path = Path(__file__).parent / WEB_SERVICE_PATH / COMMANDS_BASE_FILE_NAME if os.path.exists(path): with open(path) as file: commands = json.load(file) else: commands = {} CONFIG.set_available_commands(available_commands=commands) _LOG.info('[init] Commands base phase completed') get_module_group_and_associate_object() @tracer.wrap() @resolve_permissions(tracer=tracer) def web_help(allowed_commands, user_meta): _trace_id = get_trace_id(tracer=tracer) return build_response( _trace_id=_trace_id, http_code=HTTPStatus.OK, content={'available_commands': allowed_commands}, message="This is the Modular-API administration tool. " "To request support, please contact " "Modular Support Team" ) @tracer.wrap() @resolve_permissions(tracer=tracer) def generate_group_or_command_help(path, allowed_commands, user_meta): _trace_id = get_trace_id(tracer=tracer) route_meta_mapping = generate_route_meta_mapping( commands_meta=allowed_commands) requested_command = [] requested_commands = [] for itinerary, command_meta in route_meta_mapping.items(): if path in itinerary: requested_commands.append(command_meta) elif path == itinerary: requested_command.append(command_meta) if not any((requested_command, requested_commands)): return build_response( _trace_id=_trace_id, http_code=404, message='Can not found requested resource') return build_response( _trace_id=_trace_id, http_code=HTTPStatus.OK, content={ 'available_commands': requested_command or requested_commands}, message="This is the Modular-API administration tool. " "To request support, please contact " "Modular Support Team" ) def __validate_cli_version(_trace_id): try: version_warnings = check_version_compatibility( min_allowed_version=SP.env.min_cli_version(), current_version=request.headers.get('Cli-Version') ) return version_warnings, None except ModularApiBadRequestException as e: _LOG.warning('Version compatibility checker failed', exc_info=True) code, content = build_exception_content(exception=e) error_response = build_response(_trace_id=_trace_id, http_code=code, content=content) return None, error_response @tracer.wrap() @resolve_permissions(tracer=tracer, empty_cache=True) def login(allowed_commands, user_meta): _trace_id = get_trace_id(tracer=tracer) version_warning, error_response = __validate_cli_version( _trace_id=_trace_id ) if error_response: return error_response username, _ = request.auth meta_param = request.params.dict.get('meta') meta_return = False if meta_param: if isinstance(meta_param, list) and meta_param: meta_return = meta_param[0] meta_return = True if meta_return.lower() == 'true' else False jwt_token = encode_data_to_jwt(username=username) # Generate and store a refresh JWT token rt_version = gen_refresh_token_version() refresh_token = encode_data_to_refresh_jwt(username, rt_version) RefreshTokenService.create_and_save(username, rt_version) data = { 'jwt': jwt_token, 'refresh_token': refresh_token, 'version': __version__ } if meta_return: add_versions_to_allowed_modules(allowed_commands) data['meta'] = allowed_commands if version_warning: data['warnings'] = version_warning return build_response( _trace_id=_trace_id, http_code=HTTPStatus.OK, content=data ) @tracer.wrap() def refresh(): _trace_id = get_trace_id(tracer=tracer) refresh_token = request.json.get('refresh_token') if not refresh_token: return build_response( _trace_id=_trace_id, http_code=HTTPStatus.UNAUTHORIZED, message='Refresh token is required', ) # Validate the refresh token username, rt_version = validate_refresh_token(refresh_token) if not username: return build_response( _trace_id=_trace_id, http_code=HTTPStatus.UNAUTHORIZED, message='Invalid or expired refresh token', ) # Generate a new JWT session token jwt_token = encode_data_to_jwt(username=username) # Generate and store a new refresh JWT token new_rt_version = gen_refresh_token_version() new_refresh_token = encode_data_to_refresh_jwt(username, new_rt_version) RefreshTokenService.create_and_save(username, new_rt_version) data = { 'jwt': jwt_token, 'refresh_token': new_refresh_token, 'version': __version__, } return build_response( _trace_id=_trace_id, http_code=HTTPStatus.OK, content=data ) def add_versions_to_allowed_modules(allowed_commands: dict) -> None: """ Changes the given dict in place :param allowed_commands: :return: None """ # todo refactor with resolve_user_available_components_version ASAP for module in (Path(__file__).parent / MODULES_PATH).iterdir(): api_file_path = module / API_MODULE_FILE if not module.is_dir() or not api_file_path.exists(): continue with open(api_file_path, 'r') as file: module_descriptor = json.load(file) mount_point = module_descriptor[MOUNT_POINT_KEY] if mount_point in allowed_commands: allowed_commands[mount_point]['version'] = lib_version( module_descriptor[MODULE_NAME_KEY]) def resolve_user_available_components_version(allowed_commands: dict): modules_path = Path(__file__).parent / MODULES_PATH components_versions = {} for module in modules_path.iterdir(): api_file_path = module / API_MODULE_FILE if not module.is_dir() or not api_file_path.exists(): continue with open(api_file_path, 'r') as file: module_descriptor = json.load(file) if module_descriptor[MOUNT_POINT_KEY] in allowed_commands: module_name = module_descriptor[MODULE_NAME_KEY] components_versions[module_name] = lib_version(module_name) return components_versions @tracer.wrap() @resolve_permissions(tracer=tracer, empty_cache=False) def version(allowed_commands, user_meta): _trace_id = get_trace_id(tracer=tracer) resolve_user_available_components_version(allowed_commands) data = {'modular_api': __version__} components_version = resolve_user_available_components_version( allowed_commands ) if components_version: data['components_version'] = components_version response_template = { "items": data, "table_title": 'User available component(s) version', "warnings": [], "message": None } return build_response( _trace_id=_trace_id, http_code=HTTPStatus.OK, content=response_template ) @tracer.wrap() def health_check(): _trace_id = get_trace_id(tracer=tracer) return build_response( _trace_id=_trace_id, http_code=HTTPStatus.OK, content=None ) @tracer.wrap() @resolve_permissions(tracer=tracer, empty_cache=True) def stats(allowed_commands, user_meta): _trace_id = get_trace_id(tracer=tracer) entry_request = request required_params = [EVENT_TYPE, PRODUCT, JOB_ID, STATUS, META] absent_params = [param for param in required_params if not entry_request.json.get(param)] if absent_params: return build_response( _trace_id=_trace_id, http_code=HTTPStatus.BAD_REQUEST, content=None ) payload = {param: entry_request.json.get(param) for param in required_params} USAGE_SERVICE.save_stats(request=entry_request, payload=payload) return build_response(_trace_id=_trace_id, content=None) def __automated_relogin(request_item) -> bool: header = request_item.headers.get('Authorization') raw_token = header.split(maxsplit=2)[-1] token = decode_jwt_token(raw_token) client_meta_version = token.get('meta_version') if client_meta_version == META_VERSION_ID: return False return True def swagger_html(): response.content_type = 'text/html' return SWAGGER_HTML.format( version='latest', url=request.app.get_url('swagger-spec') ) def swagger_spec(): route_meta_mapping = generate_route_meta_mapping( commands_meta=CONFIG.available_commands ) response.content_type = 'application/json' return generate_definition( commands_def=route_meta_mapping, prefix=request.fullpath[:-len(request.path)] ) @tracer.wrap() @resolve_permissions(tracer=tracer) def index(path: str, allowed_commands=None, user_meta=None): _trace_id = get_trace_id(tracer=tracer) temp_files_list = [] try: # TODO sort out relogin and remove relogin_needed = __automated_relogin(request) auth_type = request.headers.get('authorization') if auth_type and auth_type.startswith('Basic'): relogin_needed = False if relogin_needed: # if you are going to change exception message - please change # correspond text in Modular-CLI raise ModularApiUnauthorizedException( 'The provided token has expired due to updates in ' 'commands meta. Please get a new token from \'/login\' ' 'resource') version_warning, error_response = __validate_cli_version( _trace_id=_trace_id ) if error_response: return error_response method = request.method route_meta_mapping = generate_route_meta_mapping( commands_meta=allowed_commands) command_def = route_meta_mapping.get(path) if not command_def: raise ModularApiBadRequestException('Can not found requested ' 'command') request_body_raw = extract_and_convert_parameters( request=request, command_def=command_def ) request_body_raw = validate_request(command=command_def, req_params=request_body_raw, method=method, user_meta=user_meta) secure_parameters = command_def.get('secure_parameters', []) parameters, temp_files_list, _ = \ convert_api_params( body=request_body_raw, command_def=command_def, secure_parameters=secure_parameters ) _LOG.info('Request data: \npath={}\n' 'method={}'.format(path, method)) command_handler_name = command_def.get('handler') route_path = command_def.get('route', {}).get('group_path') group_path = '/'.join(route_path.split('/')[:-1]) correct_method = getattr( MODULE_GROUP_GROUP_OBJECT_MAPPING[group_path], command_handler_name) # todo get username from user_meta of somewhere else, but # not from header again. username = username_from_jwt_token( token_from_auth_header(request.headers.get('Authorization')) ) if not username: username, _ = request.auth # saving username to thread-local storage THREAD_LOCAL_STORAGE.set('modular_user', username) curr_user = SP.user_service.describe_user(username) # saving the meta.aux_data of user in the thread-local storage modular_user_meta_aux = {} if curr_user.meta: meta_dict = curr_user.meta.as_dict() aux_data = meta_dict.get(AUX_DATA) modular_user_meta_aux = ( aux_data if isinstance(aux_data, dict) else {} ) THREAD_LOCAL_STORAGE.set('modular_user_meta_aux', modular_user_meta_aux) # hopefully, token will be here... otherwise 500, I mean, it must be # here because the endpoint is authorized by token try: response = correct_method.main( args=parameters, standalone_mode=False, obj={MODULAR_API_USERNAME: username} ) except click.exceptions.UsageError as error: # just in case something is not handled return build_response( _trace_id=_trace_id, http_code=200, # means that click worked, message=str(error) ) # obj goes to click.Context. Other module CLI should use it to # understand what user is making the request response = json.loads(response) _LOG.info(f'Obtained response {response} for {_trace_id} request') content, code = process_response(response=response) payload = {key.lower(): value for key, value in response.items()} USAGE_SERVICE.save_stats(request=request, payload=payload) # TODO raises Value error sometimes if content.get('warnings'): if version_warning: content['warnings'].extend(version_warning) else: content['warnings'] = version_warning return build_response(_trace_id=_trace_id, http_code=code, content=content) except Exception as e: # TODO use _LOG.exception _LOG.exception('Unexpected exception occurred') code, content = build_exception_content(exception=e) error_response = build_response(_trace_id=_trace_id, http_code=code, content=content) USAGE_SERVICE.save_stats(request=request, payload=content) return error_response finally: if temp_files_list: for each_file in temp_files_list: os.remove(each_file) class RateLimitMiddleware: __slots__ = 'app', 'limiter', 'limit' def __init__(self, app: Callable, limiter: RateLimiter, limit: RateLimitItem): self.app = app self.limiter = limiter self.limit = limit def __call__(self, environ, start_response): if not self.limiter.hit(self.limit, environ.get('REMOTE_ADDR')): _LOG.debug('Requests limit hit. Returning 429') c = HTTPStatus.TOO_MANY_REQUESTS start_response( f'{c.value} {c.phrase}', [('Content-Type', 'text/plain')], ) return [HTTPStatus.TOO_MANY_REQUESTS.description.encode()] return self.app(environ, start_response) class WSGIApplicationBuilder: def __init__(self, env: EnvironmentService, prefix: str = '', swagger: bool = False, swagger_prefix: str = '/swagger'): self._env = env self._prefix = prefix self._swagger = swagger self._swagger_prefix = swagger_prefix @staticmethod def _build_generic_error_handler(code: HTTPStatus) -> Callable: """ Builds a generic callback that handles a specific error code :param code: :return: """ def f(error): return json.dumps({'message': code.phrase}, separators=(',', ':')) return f def _register_errors(self, application: Bottle) -> None: to_handle = (HTTPStatus.NOT_FOUND, HTTPStatus.INTERNAL_SERVER_ERROR) for code in to_handle: application.error_handler[code.value] = self._build_generic_error_handler(code) def _rate_limited(self, app: Callable) -> Callable: match self._env.mode(): case ServiceMode.SAAS: storage = MemoryStorage() # todo fix for saas, either implement storage for dynamodb # or move completely to mongo, or use redis just for broker # or use nginx and set rate limiting there. # This MemoryStorage performs far from perfect when multiple # processes and should not be used case _: storage = MongoDBStorage( uri=self._env.mongo_uri(), database_name=self._env.mongo_rate_limits_database() ) return RateLimitMiddleware( app=app, limiter=MovingWindowRateLimiter(storage), limit=RateLimitItemPerSecond(self._env.api_calls_per_second_limit()) ) def build(self) -> Callable: """ It returns a wsgi callable, not a Bottle instance :return: """ _LOG.info('Building WSGI application') child = Bottle() self._register_errors(child) if self._swagger: child.route( path='/swagger.json', method=HTTPMethod.GET, callback=swagger_spec, name='swagger-spec' ) child.route( path=self._swagger_prefix, method=HTTPMethod.GET, callback=swagger_html ) child.route( path='/doc', method=HTTPMethod.GET, callback=web_help ) child.route( path='/doc<path:path>', method=HTTPMethod.GET, callback=generate_group_or_command_help ) child.route( path='/login', method=HTTPMethod.GET, callback=login ) child.route( path='/refresh', method=HTTPMethod.POST, callback=refresh ) child.route( path='/version', method=HTTPMethod.GET, callback=version ) child.route( path='/health_check', method=HTTPMethod.GET, callback=health_check ) child.route( path='/stats', method=HTTPMethod.POST, callback=stats ) # TODO use some prefix for all commands, because this catches # everything child.route( path='<path:path>', method=[HTTPMethod.POST, HTTPMethod.GET], callback=index ) if self._prefix: application = Bottle() self._register_errors(application) pr = f'/{self._prefix.strip("/")}/' application.mount(pr, child) else: application = child if self._env.is_rate_limiting_enabled(): _LOG.info('Enabling rate limiting') application = self._rate_limited(application) _LOG.info('WSGI application was built') return application def main(): load_dotenv(verbose=True) sys.stderr.write( '[WARNING] This way to start the server is deprecated and ' 'will be removed. Please, use "modular run" command instead\n' ) # warnings.warn(DeprecationWarning( # 'This way to start the server is deprecated and ' # 'will be removed. Please, use "modular run" command instead' # )) initialize() # can raise application = WSGIApplicationBuilder(SP.env).build() bottle.run(application, host='127.0.0.1', port=8085) if __name__ == "__main__": main()