cli/srecli/group/__init__.py (436 lines of code) (raw):
import json
import operator
import shutil
import sys
import urllib.error
from abc import ABC, abstractmethod
from datetime import timezone
from functools import reduce, wraps
from http import HTTPStatus
from itertools import islice
from pathlib import Path
from typing import Any, Callable, TypedDict, cast
import click
from dateutil.parser import isoparse
from tabulate import tabulate
from srecli.service.adapter_client import CustodianApiClient, CustodianResponse
from srecli.service.config import (
AbstractCustodianConfig,
CustodianCLIConfig,
CustodianWithCliSDKConfig,
)
from srecli.service.constants import (
CONTEXT_MODULAR_ADMIN_USERNAME,
DATA_ATTR,
ITEMS_ATTR,
ERRORS_ATTR,
MESSAGE_ATTR,
MODULE_NAME,
NEXT_TOKEN_ATTR,
NO_CONTENT_RESPONSE_MESSAGE,
NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE,
JobType,
Env,
MODULAR_ADMIN,
STATUS_ATTR, SUCCESS_STATUS, ERROR_STATUS, CODE_ATTR, TABLE_TITLE_ATTR,
REVERT_TO_JSON_MESSAGE, COLUMN_OVERFLOW
)
from srecli.service.logger import get_logger, enable_verbose_logs
CredentialsProvider = None
try:
from modular_cli_sdk.services.credentials_manager import \
CredentialsProvider
except ImportError:
pass
_LOG = get_logger(__name__)
class TableException(Exception):
def __init__(self, table: str, message: str):
self._message = message
self._table = table
@property
def table(self):
return self._table
def __str__(self):
return self._message
class ColumnOverflow(TableException):
def __init__(self, table: str, message: str = COLUMN_OVERFLOW):
super().__init__(table=table, message=message)
class ContextObj(TypedDict):
"""
Make sure to sync it with constants, 'cause we cannot use variables
as keys in TypedDict
class ContextObj(TypedDict):
CONTEXT_CONFIG: CustodianCLIConfig
CONTEXT_API_CLIENT: CustodianApiClient
- that does not work
"""
config: AbstractCustodianConfig
api_client: CustodianApiClient
class cli_response: # noqa
__slots__ = ('_attributes_order', '_check_api_link', '_check_access_token')
def __init__(self, attributes_order: tuple[str, ...] = (),
check_api_link: bool = True,
check_access_token: bool = True):
self._attributes_order = attributes_order
self._check_api_link = check_api_link
self._check_access_token = check_access_token
@staticmethod
def to_exit_code(code: HTTPStatus | None) -> int:
if not code:
return 1
if 200 <= code < 400:
return 0
return 1
@staticmethod
def update_context(ctx: click.Context):
"""
Updates the given (current) click context's obj dict with api
client instance and config instance
:param ctx:
:return:
"""
if not isinstance(ctx.obj, dict):
ctx.obj = {}
if CredentialsProvider:
_LOG.debug('Cli sdk is installed. Using its credentials provider')
config = CustodianWithCliSDKConfig(
credentials_manager=CredentialsProvider(
module_name=MODULE_NAME, context=ctx
).credentials_manager
)
else:
_LOG.warning(
'Could not import modular_cli_sdk. Using standard '
'config instead of the one provided by cli skd'
)
m3_username = ctx.obj.get(CONTEXT_MODULAR_ADMIN_USERNAME)
if isinstance(m3_username, str): # basically if not None
config = CustodianCLIConfig(prefix=m3_username) # modular
else:
config = CustodianCLIConfig() # standard
# ContextObj
ctx.obj.update({
'api_client': CustodianApiClient(config),
'config': config
})
def _check_context(self, ctx: click.Context):
"""
May raise click.UsageError
:param ctx:
:return:
"""
obj: ContextObj = cast(ContextObj, ctx.obj)
config = obj['config']
if self._check_api_link and not config.api_link:
raise click.UsageError(
'Custodian Service API link is not configured. '
'Run \'sre configure\' and try again.'
)
if self._check_access_token and not config.access_token:
raise click.UsageError(
'Custodian access token not found. Run \'sre login\' '
'to receive the token'
)
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
modular_mode = False
if Path(__file__).parents[3].name == MODULAR_ADMIN: # TODO check some other way
modular_mode = True
json_view = Env.RESPONSE_FORMAT.get() == 'json' or kwargs.get('json')
verbose = Env.VERBOSE.get() or kwargs.get('verbose') # todo verbose can be enabled earlier if from env
kwargs.pop('json', None)
kwargs.pop('verbose', None)
if verbose:
enable_verbose_logs()
ctx = cast(click.Context, click.get_current_context())
self.update_context(ctx)
try:
self._check_context(ctx)
resp: CustodianResponse = click.pass_obj(func)(*args, **kwargs)
except click.ClickException as e:
_LOG.info('Click exception has occurred')
resp = response(e.format_message(), code=HTTPStatus.BAD_REQUEST)
except Exception as e:
_LOG.error(f'Unexpected error has occurred: {e}')
resp = response(str(e), code=HTTPStatus.INTERNAL_SERVER_ERROR)
if modular_mode:
_LOG.info('The cli is installed as a module. '
'Returning m3 modular cli response')
formatted = ModularResponseProcessor().format(resp)
return json.dumps(formatted, separators=(',', ':'))
if not json_view: # table view
_LOG.info('Returning table view')
prepared = TableResponseProcessor().format(resp)
trace_id = resp.trace_id
next_token = (resp.data or {}).get(NEXT_TOKEN_ATTR)
try:
printer = TablePrinter(
items_per_column=ctx.obj['config'].items_per_column,
attributes_order=self._attributes_order
)
table = printer.print(
prepared,
raise_on_overflow=not Env.NO_PROMPT.get()
)
except ColumnOverflow as ce:
_LOG.info(f'Awaiting user to respond to - {ce!r}.')
to_revert = click.prompt(
REVERT_TO_JSON_MESSAGE,
type=click.Choice(('y', 'n'))
)
if to_revert == 'n':
table = ce.table
else:
table, json_view = None, True
if table:
if verbose:
click.echo(f'Trace id: \'{trace_id}\'')
if next_token:
click.echo(f'Next token: \'{next_token}\'')
click.echo(table)
_LOG.info(f'Finished request: \'{trace_id}\'')
if json_view:
_LOG.info('Returning json view')
data = JsonResponseProcessor().format(resp)
click.echo(json.dumps(data, indent=4))
sys.exit(self.to_exit_code(resp.code))
return wrapper
class ResponseProcessor(ABC):
@abstractmethod
def format(self, resp: CustodianResponse) -> Any:
"""
Returns a dict that can be printed or used for printing
:param resp:
:return:
"""
class JsonResponseProcessor(ResponseProcessor):
"""
Processes the json before it can be printed
"""
def format(self, resp: CustodianResponse) -> dict:
if resp.code == HTTPStatus.NO_CONTENT:
return {MESSAGE_ATTR: NO_CONTENT_RESPONSE_MESSAGE}
elif isinstance(resp.exc, json.JSONDecodeError):
if not resp.data and resp.code:
return {MESSAGE_ATTR: resp.code.phrase}
return {MESSAGE_ATTR: f'Invalid JSON received: {resp.exc.msg}'}
elif isinstance(resp.exc, urllib.error.URLError):
return {MESSAGE_ATTR: f'Cannot send a request: {resp.exc.reason}'}
return resp.data or {}
class TableResponseProcessor(JsonResponseProcessor):
"""
Processes the json before it can be converted to table and printed
"""
def format(self, resp: CustodianResponse) -> list[dict]:
dct = super().format(resp)
if data := dct.get(DATA_ATTR):
return [data]
if errors := dct.get(ERRORS_ATTR):
return errors
if items := dct.get(ITEMS_ATTR):
return items
if ITEMS_ATTR in dct and not dct.get(ITEMS_ATTR): # empty
return [{MESSAGE_ATTR: NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE}]
return [dct]
class ModularResponseProcessor(JsonResponseProcessor):
modular_table_title = 'Syndicate Rule Engine'
@staticmethod
def _errors_to_message(errors: list[dict]) -> str:
"""
Modular cli accepts only messages if status code is not 200
:param errors:
:return:
"""
def _format_er(e):
loc = ''
first = True
for item in e.get('location') or ():
if isinstance(item, int):
loc += f'[{str(item)}]'
else:
if first:
loc += str(item)
else:
loc += f' -> {str(item)}'
first = False
description = e.get('description') or 'Invalid value'
if loc:
return f'{loc}: {description}'
else:
return description
return '\n'.join(map(_format_er, errors))
def format(self, resp: CustodianResponse) -> dict:
base = {
CODE_ATTR: resp.code or HTTPStatus.SERVICE_UNAVAILABLE.value,
STATUS_ATTR: SUCCESS_STATUS if resp.ok else ERROR_STATUS,
TABLE_TITLE_ATTR: self.modular_table_title
}
dct = super().format(resp)
if data := dct.get(DATA_ATTR):
base[ITEMS_ATTR] = [data]
elif errors := dct.get(ERRORS_ATTR):
base[MESSAGE_ATTR] = self._errors_to_message(errors)
elif dct.get(ITEMS_ATTR):
base.update(dct)
elif ITEMS_ATTR in dct: # empty
base[MESSAGE_ATTR] = NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE
elif message := dct.get(MESSAGE_ATTR):
base[MESSAGE_ATTR] = message
else:
base[ITEMS_ATTR] = [dct]
return base
class TablePrinter:
default_datetime_format: str = '%A, %B %d, %Y %I:%M:%S %p'
default_format = 'pretty'
def __init__(self, format: str = default_format,
datetime_format: str = default_datetime_format,
items_per_column: int | None = None,
attributes_order: tuple[str, ...] = ()):
self._format = format
self._datetime_format = datetime_format
self._items_per_column = items_per_column
if attributes_order:
self._order = {x: i for i, x in enumerate(attributes_order)}
else:
self._order = None
def prepare_value(self, value: str | list | dict | None) -> str:
"""
Makes the given value human-readable. Should be applied only for
table view since it can reduce the total amount of useful information
within the value in favor of better view.
:param value:
items per column.
:return:
"""
if not value and not isinstance(value, (int, bool)):
return '—'
limit = self._items_per_column
to_limit = limit is not None
f = self.prepare_value
# todo, maybe use just list comprehensions instead of iterators
match value:
case list():
i_recurse = map(f, value)
result = ', '.join(islice(i_recurse, limit))
if to_limit and len(value) > limit:
result += f'... ({len(value)})' # or len(value) - limit
return result
case dict():
i_prepare = (
f'{f(value=k)}: {f(value=v)}'
for k, v in islice(value.items(), limit)
)
result = reduce(lambda a, b: f'{a}; {b}', i_prepare)
if to_limit and len(value) > limit:
result += f'... ({len(value)})'
return result
case str():
try:
obj = isoparse(value)
# we assume that everything from the server is UTC even
# if it is a naive object
obj.replace(tzinfo=timezone.utc)
return obj.astimezone().strftime(self._datetime_format)
except ValueError:
return value
case _: # bool, int
return str(value)
def print(self, data: list[dict], raise_on_overflow: bool = True) -> str:
if order := self._order:
def key(tpl):
return order.get(tpl[0], 4096) # just some big int
formatted = self._items_table([
dict(sorted(dct.items(), key=key)) for dct in data
])
else:
formatted = self._items_table(data)
overflow = formatted.index('\n') > shutil.get_terminal_size().columns
if overflow and raise_on_overflow:
raise ColumnOverflow(table=formatted)
return formatted
def _items_table(self, items: list[dict]) -> str:
prepare_value = self.prepare_value
rows, title_to_key = [], {}
for entry in items:
for key in entry:
title = key.replace('_', ' ').capitalize() # title
if title not in title_to_key:
title_to_key[title] = key
for entry in items:
rows.append([
prepare_value(value=entry.get(key))
for key in title_to_key.values()
])
return tabulate(
rows, headers=list(title_to_key),
tablefmt=self._format
)
class ViewCommand(click.core.Command):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.params.append(
click.core.Option(
('--json',),
is_flag=True,
help='Response as a JSON'
)
)
self.params.append(
click.core.Option(
('--verbose',),
is_flag=True,
help='Save detailed information to the log file'
)
)
self.params.append(
click.core.Option(
('--customer_id', '-cid'),
type=str,
help='Hidden customer option to make a request on other '
'customer`s behalf. Only for system customer',
required=False,
hidden=True
)
)
def response(*args, **kwargs):
if kwargs.get('err'):
kwargs['code'] = HTTPStatus.BAD_REQUEST
kwargs.pop('err', None)
return CustodianResponse.build(*args, **kwargs)
# callbacks
def convert_in_upper_case_if_present(ctx, param, value):
if isinstance(value, (list, tuple)):
return [each.upper() for each in value]
elif value:
return value.upper()
def convert_in_lower_case_if_present(ctx, param, value):
if isinstance(value, (list, tuple)):
return [each.lower() for each in value]
elif value:
return value.lower()
def build_tenant_option(**kwargs) -> Callable:
params = dict(
type=str,
required=False,
help='Name of related tenant',
callback=convert_in_upper_case_if_present
)
params.update(kwargs)
return click.option('--tenant_name', '-tn', **params)
def build_account_option(**kwargs) -> Callable:
params = dict(
type=str,
required=False,
help='Cloud native account identifier'
)
params.update(kwargs)
return click.option('--account_number', '-acc', **params)
def build_iso_date_option(*args, **kwargs) -> Callable:
help_iso = 'ISO 8601 format. Example: 2021-09-22T00:00:00.000000'
params = dict(type=isoparse, required=False)
if 'help' in kwargs:
_help: str = kwargs.pop('help')
if help_iso not in _help:
_help = f'{_help.rstrip(".")}. {help_iso}'
kwargs['help'] = _help
params.update(kwargs)
return click.option(*args, **params)
def build_job_id_option(*args, **kwargs) -> Callable:
params = dict(
type=str, required=False,
help='Unique job identifier'
)
params.update(kwargs)
return click.option('--job_id', '-id', *args, **params)
def build_job_type_option(*args, **kwargs) -> Callable:
params = dict(
type=click.Choice(tuple(map(operator.attrgetter('value'), JobType))),
help='Specify type of jobs to retrieve.',
required=False
)
params.update(kwargs)
return click.option('--job_type', '-jt', *args, **params)
def build_rule_source_id_option(**kwargs) -> Callable:
params = dict(
type=str, required=False,
help='Unique rule-source identifier.'
)
params.update(**kwargs)
return click.option('--rule_source_id', '-rsid', **params)
def build_limit_option(**kwargs) -> Callable:
params = dict(
type=click.IntRange(min=1, max=50),
default=10, show_default=True,
help='Number of records to show'
)
params.update(kwargs)
return click.option('--limit', '-l', **params)
tenant_option = build_tenant_option()
account_option = build_account_option()
optional_job_type_option = build_job_type_option()
from_date_iso_args = ('--from_date', '-from')
to_date_iso_args = ('--to_date', '-to')
from_date_report_option = build_iso_date_option(
*from_date_iso_args, required=False,
help='Generate report FROM date.'
)
to_date_report_option = build_iso_date_option(
*to_date_iso_args, required=False,
help='Generate report TILL date.'
)
limit_option = build_limit_option()
next_option = click.option('--next_token', '-nt', type=str, required=False,
help='Token to start record-pagination from')