modular-service-cli/modular_service_cli/group/__init__.py (316 lines of code) (raw):
import json
import os
import urllib.error
from abc import ABC, abstractmethod
from datetime import timezone
from functools import reduce, wraps
from http import HTTPStatus
from itertools import islice
from pathlib import Path
from typing import Any, Callable, Literal, TypedDict, cast
import click
from dateutil.parser import isoparse
from tabulate import tabulate
from modular_service_cli.service.api_client import (
ApiResponse,
ModularServiceApiClient
)
from modular_service_cli.service.config import (
AbstractConfig,
ModularCliSdkConfig,
OnDiskModularServiceConfig,
)
from modular_service_cli.service.constants import (
DATA_ATTR,
ERRORS_ATTR,
ITEMS_ATTR,
MESSAGE_ATTR,
NEXT_TOKEN_ATTR,
NO_CONTENT_RESPONSE_MESSAGE,
NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE,
)
from modular_service_cli.service.logger import get_logger, write_verbose_logs
CredentialsProvider = None
try:
from modular_cli_sdk.services.credentials_manager import (
CredentialsProvider
)
except ImportError:
pass
_LOG = get_logger(__name__)
REVERT_TO_JSON_MESSAGE = 'The command`s response is pretty huge and the ' \
'result table structure can be broken.\n' \
'Do you want to show the response in the JSON format?'
COLUMN_OVERFLOW = 'Column has overflown, within the table representation.'
class ColumnOverflow(Exception):
__slots__ = 'table', 'message'
def __init__(self, table: str, message: str = COLUMN_OVERFLOW):
self.table = table
self.message = message
class ContextObj:
__slots__ = 'config', 'api_client'
def __init__(self, config: AbstractConfig,
api_client: ModularServiceApiClient):
self.config: AbstractConfig = config
self.api_client: ModularServiceApiClient = api_client
class cli_response: # noqa
__slots__ = ('_attributes_order', '_check_api_link', '_check_access_token')
def __init__(self, attributes_order: tuple[str, ...] = (),
check_api_link: bool = True,
check_access_token: bool = True):
self._attributes_order = attributes_order
self._check_api_link = check_api_link
self._check_access_token = check_access_token
@staticmethod
def update_context(ctx: click.Context):
"""
Updates the given (current) click context's obj with api
client instance and config instance
:param ctx:
:return:
"""
if CredentialsProvider:
_LOG.debug('Cli sdk is installed. Using its credentials provider')
config = ModularCliSdkConfig(
credentials_manager=CredentialsProvider(
module_name='modular-service-cli', context=ctx
).credentials_manager
)
else:
_LOG.warning(
'Could not import modular_cli_sdk. Using standard '
'config instead of the one provided by cli skd'
)
m3_username = None
if isinstance(ctx.obj, dict):
m3_username = ctx.obj.get('modular_admin_username')
if isinstance(m3_username, str): # basically if not None
# modular
config = OnDiskModularServiceConfig(prefix=m3_username)
else:
# standard
config = OnDiskModularServiceConfig()
ctx.obj = ContextObj(
config=config,
api_client=ModularServiceApiClient(config)
)
def _check_context(self, ctx: click.Context):
"""
May raise click.UsageError
:param ctx:
:return:
"""
obj: ContextObj = cast(ContextObj, ctx.obj)
config = obj.config
if self._check_api_link and not config.api_link:
raise click.UsageError(
'Modular service API link is not configured. '
'Run \'modular-service configure\' and try again.'
)
if self._check_access_token and not config.access_token:
raise click.UsageError(
'Modular access token not found. '
'Run \'modular-service login\' to receive the token'
)
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
modular_mode = False
if Path(__file__).parents[
3].name == 'modules': # TODO check some other way
modular_mode = True
json_view = kwargs.pop('json')
verbose = kwargs.pop('verbose')
if verbose:
write_verbose_logs()
ctx = cast(click.Context, click.get_current_context())
self.update_context(ctx)
try:
self._check_context(ctx)
resp: ApiResponse = click.pass_obj(func)(*args, **kwargs)
except click.ClickException as e:
_LOG.info('Click exception has occurred')
resp = ApiResponse.build(e.format_message())
except Exception as e:
_LOG.exception('Unexpected error has occurred')
resp = ApiResponse.build(str(e))
if modular_mode:
_LOG.info('The cli is installed as a module. '
'Returning m3 modular cli response')
formatted = ModularResponseProcessor().format(resp)
return json.dumps(formatted, separators=(',', ':'))
if not json_view: # table view
_LOG.info('Returning table view')
prepared = TableResponseProcessor().format(resp)
trace_id = resp.trace_id
next_token = (resp.data or {}).get(NEXT_TOKEN_ATTR)
try:
printer = TablePrinter(
attributes_order=self._attributes_order
)
table = printer.print(prepared)
except ColumnOverflow as ce:
_LOG.info(f'Awaiting user to respond to - {ce!r}.')
to_revert = click.prompt(
REVERT_TO_JSON_MESSAGE,
type=click.Choice(('y', 'n'))
)
if to_revert == 'n':
table = ce.table
else:
table, json_view = None, True
if table:
if verbose:
click.echo(f'Trace id: \'{trace_id}\'')
if next_token:
click.echo(f'Next token: \'{next_token}\'')
click.echo(table)
_LOG.info(f'Finished request: \'{trace_id}\'')
if json_view:
_LOG.info('Returning json view')
data = JsonResponseProcessor().format(resp)
click.echo(json.dumps(data, indent=2))
return wrapper
class ResponseProcessor(ABC):
@abstractmethod
def format(self, resp: ApiResponse) -> Any:
"""
Returns a dict that can be printed or used for printing
:param resp:
:return:
"""
class JsonResponseProcessor(ResponseProcessor):
"""
Processes the json before it can be printed
"""
def format(self, resp: ApiResponse) -> dict:
if resp.code == HTTPStatus.NO_CONTENT:
return {MESSAGE_ATTR: NO_CONTENT_RESPONSE_MESSAGE}
elif isinstance(resp.exc, json.JSONDecodeError):
return {MESSAGE_ATTR: f'Invalid JSON received: {resp.exc.msg}'}
elif isinstance(resp.exc, urllib.error.URLError):
return {MESSAGE_ATTR: f'Cannot send a request: {resp.exc.reason}'}
return resp.data or {}
class TableResponseProcessor(JsonResponseProcessor):
"""
Processes the json before it can be converted to table and printed
"""
def format(self, resp: ApiResponse) -> list[dict]:
dct = super().format(resp)
if data := dct.get(DATA_ATTR):
return [data]
if errors := dct.get(ERRORS_ATTR):
return errors
if items := dct.get(ITEMS_ATTR):
return items
if ITEMS_ATTR in dct and not dct.get(ITEMS_ATTR): # empty
return [{MESSAGE_ATTR: NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE}]
return [dct]
class ModularResponseProcessor(JsonResponseProcessor):
modular_table_title = 'Custodian as a service'
class ModularResponse(TypedDict, total=False):
code: HTTPStatus
status: Literal['SUCCESS']
table_title: str
items: list[str] | None
message: str | None
def format(self, resp: ApiResponse) -> ModularResponse:
base = {
'code': resp.code,
'status': 'SUCCESS',
'table_title': self.modular_table_title,
}
dct = super().format(resp)
if data := dct.get(DATA_ATTR):
base[ITEMS_ATTR] = [data]
elif errors := dct.get(ERRORS_ATTR):
base[ITEMS_ATTR] = errors
elif dct.get(ITEMS_ATTR):
base.update(dct)
elif ITEMS_ATTR in dct: # empty
base[MESSAGE_ATTR] = NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE
elif message := dct.get(MESSAGE_ATTR):
base[MESSAGE_ATTR] = message
else:
base[ITEMS_ATTR] = [dct]
return base
class TablePrinter:
__slots__ = '_format', '_datetime_format', '_items_per_column', '_order'
default_datetime_format: str = '%A, %B %d, %Y %I:%M:%S %p'
def __init__(self, format: str = 'pretty',
datetime_format: str = default_datetime_format,
items_per_column: int | None = None,
attributes_order: tuple[str, ...] = ()):
self._format = format
self._datetime_format = datetime_format
self._items_per_column = items_per_column
if attributes_order:
self._order = {x: i for i, x in enumerate(attributes_order)}
else:
self._order = None
def prepare_value(self, value: str | list | dict | None) -> str:
"""
Makes the given value human-readable. Should be applied only for
table view since it can reduce the total amount of useful information
within the value in favor of better view.
:param value:
items per column.
:return:
"""
if not value and not isinstance(value, (int, bool)):
return '—'
limit = self._items_per_column
to_limit = limit is not None
f = self.prepare_value
# todo, maybe use just list comprehensions instead of iterators
match value:
case list():
i_recurse = map(f, value)
result = ', '.join(islice(i_recurse, limit))
if to_limit and len(value) > limit:
result += f'... ({len(value)})' # or len(value) - limit
return result
case dict():
i_prepare = (
f'{f(value=k)}: {f(value=v)}'
for k, v in islice(value.items(), limit)
)
result = reduce(lambda a, b: f'{a}; {b}', i_prepare)
if to_limit and len(value) > limit:
result += f'... ({len(value)})'
return result
case str():
try:
obj = isoparse(value)
# we assume that everything from the server is UTC even
# if it is a naive object
obj.replace(tzinfo=timezone.utc)
return obj.astimezone().strftime(self._datetime_format)
except ValueError:
return value
case _: # bool, int
return str(value)
def print(self, data: list[dict]) -> str:
"""
Raises on overflow
:param data:
:return:
"""
if order := self._order:
def key(tpl):
return order.get(tpl[0], 4096) # just some big int
formatted = self._items_table([
dict(sorted(dct.items(), key=key)) for dct in data
])
else:
formatted = self._items_table(data)
overflow = formatted.index('\n') > os.get_terminal_size().columns
if overflow:
raise ColumnOverflow(table=formatted)
return formatted
def _items_table(self, items: list[dict]) -> str:
prepare_value = self.prepare_value
rows, title_to_key = [], {}
for entry in items:
for key in entry:
title = key.replace('_', ' ').capitalize() # title
if title not in title_to_key:
title_to_key[title] = key
for entry in items:
rows.append([
prepare_value(value=entry.get(key))
for key in title_to_key.values()
])
return tabulate(
rows, headers=list(title_to_key),
tablefmt=self._format
)
class ViewCommand(click.core.Command):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.params.append(
click.core.Option(('--json',), is_flag=True,
help='Response as a JSON'))
self.params.append(
click.core.Option(('--verbose',), is_flag=True,
help='Save detailed information to '
'the log file'))
self.params.append(
click.core.Option(('--customer_id', '-cid'), type=str,
required=False, hidden=True)
)
def build_limit_option(**kwargs) -> Callable:
params = dict(
type=click.IntRange(min=1, max=50),
default=10, show_default=True,
help='Number of records to show'
)
params.update(kwargs)
return click.option('--limit', '-l', **params)
def build_next_token_option(**kwargs) -> Callable:
params = dict(
type=str, required=False,
help='Token to start record-pagination from'
)
params.update(kwargs)
return click.option('--next_token', '-nt', **params)