syndicate/core/helper.py (670 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import concurrent.futures
import getpass
import json
import os
import re
import subprocess
import sys
import logging
import traceback
import zipfile
from datetime import datetime, timedelta
from functools import wraps
from pathlib import Path
from signal import SIGINT
from threading import Thread
from time import time
import click
from click import BadParameter
from tqdm import tqdm
from syndicate.commons.log_helper import get_logger, get_user_logger, \
LOG_FORMAT_FOR_CONSOLE
from syndicate.core.conf.processor import path_resolver
from syndicate.core.conf.validator import ConfigValidator, ALL_REGIONS
from syndicate.core.constants import (BUILD_META_FILE_NAME,
DEFAULT_SEP, DATE_FORMAT_ISO_8601,
CUSTOM_AUTHORIZER_KEY, OK_RETURN_CODE,
ABORTED_RETURN_CODE, FAILED_RETURN_CODE)
from syndicate.core.project_state.project_state import MODIFICATION_LOCK, \
WARMUP_LOCK, ProjectState
from syndicate.core.project_state.sync_processor import sync_project_state
_LOG = get_logger(__name__)
USER_LOG = get_user_logger()
CONF_PATH = os.environ.get('SDCT_CONF')
def unpack_kwargs(handler_func):
""" Decorator for unpack kwargs.
:type handler_func: func
:param handler_func: function which will be decorated
"""
@wraps(handler_func)
def wrapper(*kwargs):
""" Wrapper func."""
parameters = {}
for i in kwargs:
if type(i) is dict:
parameters = i
return handler_func(**parameters)
return wrapper
def failed_status_code_on_exception(handler_func):
""" Decorator to catch all exceptions and fail stage execution
:type handler_func: func
:param handler_func: function which will be decorated
"""
@wraps(handler_func)
def wrapper(*args, **kwargs):
""" Wrapper func."""
try:
return handler_func(*args, **kwargs)
except Exception as e:
USER_LOG.error(f'An error occurred: {str(e)}')
_LOG.exception(traceback.format_exc())
return FAILED_RETURN_CODE
return wrapper
def prettify_json(obj):
return json.dumps(obj, indent=4)
def execute_command_by_path(command, path):
result = subprocess.run(command, shell=True, cwd=path, capture_output=True,
text=True)
if result.returncode != 0:
msg = (f'While running the command "{command}" occurred an error:\n'
f'"{result.stdout}\n{result.stderr}"')
raise AssertionError(msg)
_LOG.info(f'Running the command "{command}"\n{result.stdout}'
f'\n{result.stderr}')
def build_path(*paths):
return DEFAULT_SEP.join(paths)
def _find_alias_and_replace(some_string):
""" Find placeholder for alias in string. If found - replace with alias
value.
:type some_string: str
"""
from syndicate.core import CONFIG
first_index = some_string.index('${')
second_index = some_string.index('}')
alias_name = some_string[first_index + 2:second_index]
res_alias = CONFIG.resolve_alias(alias_name)
if not res_alias:
raise AssertionError('Can not found alias for {0}'.format(alias_name))
result = (
some_string[:first_index] + res_alias + some_string[
second_index + 1:])
return result
def resolve_aliases_for_string(string_value):
""" Look for aliases in string.
:type string_value: str
"""
input_string = string_value
try:
if '${' in string_value:
if string_value.count('${') == string_value.count('}'):
while True:
input_string = _find_alias_and_replace(input_string)
else:
raise AssertionError('Broken alias in value: {0}.'.format(
string_value))
return input_string
except ValueError:
return input_string
def check_required_param(ctx, param, value):
if not value:
raise BadParameter('Parameter is required')
return value
def param_to_lower(ctx, param, value):
if isinstance(value, tuple):
return tuple([a_value.lower() for a_value in value])
if isinstance(value, str):
return value.lower()
def resolve_path_callback(ctx, param, value):
if not value:
raise BadParameter('Parameter is required')
return path_resolver(value)
def generate_default_bundle_name(ctx, param, value):
if value:
return value
from syndicate.core import CONFIG
# regex to replace all special characters except dash, underscore and dot
pattern = re.compile('[^0-9a-zA-Z.\-_]')
project_path = CONFIG.project_path
project_name = project_path.split("/")[-1]
result_project_name = re.sub(pattern, '', project_name)
date = datetime.now().strftime("%y%m%d.%H%M%S")
bundle_name = f'{result_project_name}_{date}'
if len(bundle_name) > 63:
USER_LOG.warn(f'Bundle name \'{bundle_name}\' is too long. Trim it to '
f'the last 63 characters. Please rename project to '
f'shorter name to avoid this warning.')
return bundle_name[-63:]
return bundle_name
def resolve_default_bundle_name(command_name):
from syndicate.core import PROJECT_STATE
if command_name in 'clean':
bundle_name = PROJECT_STATE.latest_deployed_bundle_name
else:
bundle_name = PROJECT_STATE.latest_bundle_name
if not bundle_name:
USER_LOG.error(
'Property \'bundle\' could not be resolved from the syndicate '
'project state file.'
)
return ABORTED_RETURN_CODE
return bundle_name
def resolve_default_deploy_name(command_name):
from syndicate.core import PROJECT_STATE
if command_name in ('clean', 'update'):
deploy_name = PROJECT_STATE.latest_deployed_deploy_name
else:
deploy_name = PROJECT_STATE.default_deploy_name
return deploy_name
param_resolver_map = {
'bundle_name': resolve_default_bundle_name,
'deploy_name': resolve_default_deploy_name
}
def resolve_default_value(ctx, param, value):
if value:
return value
sync_project_state()
command_name = ctx.info_name
param_resolver = param_resolver_map.get(param.name)
if not param_resolver:
raise AssertionError(
f'There is no resolver of default value '
f'for param {param.name}')
resolved_value = param_resolver(command_name=command_name)
USER_LOG.info(f'Resolved value of {param.name}: {resolved_value}')
return resolved_value
def create_bundle_callback(ctx, param, value):
from syndicate.core.build.helper import resolve_bundle_directory
if not value:
raise BadParameter('Parameter is required')
bundle_path = resolve_bundle_directory(value)
if not os.path.exists(bundle_path):
os.makedirs(bundle_path)
return value
def verify_bundle_callback(ctx, param, value):
from syndicate.core.build.helper import resolve_bundle_directory
bundle_path = resolve_bundle_directory(value)
if not os.path.exists(bundle_path):
raise click.BadParameter(
"Bundle name does not exist. Please, invoke "
"'syndicate assemble' command to create a bundle.")
return value
def verify_meta_bundle_callback(ctx, param, value):
from syndicate.core.build.helper import resolve_bundle_directory
bundle_path = resolve_bundle_directory(value)
build_meta_path = os.path.join(bundle_path, BUILD_META_FILE_NAME)
if not os.path.exists(build_meta_path):
raise click.BadParameter(
"Bundle name is incorrect. {0} does not exist. Please, invoke "
"'package_meta' command to create a file.".format(
BUILD_META_FILE_NAME))
return value
def resolve_and_verify_bundle_callback(ctx, param, value):
if not value:
_LOG.debug(f'{param.name} is not specified, latest build will be used')
value = resolve_default_value(ctx, param, value)
if not value:
raise click.BadParameter(
f'Couldn\'t resolve the parameter automatically. '
f'Try to specify it manually'
)
return verify_meta_bundle_callback(ctx, param, value)
def write_content_to_file(file_path, file_name, obj):
file_name = os.path.join(file_path, file_name)
if os.path.exists(file_name):
_LOG.warn('{0} already exists'.format(file_name))
else:
folder_path = Path(file_path)
folder_path.mkdir(parents=True, exist_ok=True)
meta_file = Path(file_name)
meta_file.touch(exist_ok=True)
with open(file_name, 'w+') as meta_file:
json.dump(obj, meta_file)
_LOG.info('{0} file was created.'.format(meta_file.name))
def sync_lock(lock_type):
def real_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
sync_project_state()
from syndicate.core import PROJECT_STATE
if PROJECT_STATE.is_lock_free(lock_type):
PROJECT_STATE.acquire_lock(lock_type)
sync_project_state()
else:
raise AssertionError(f'The project {lock_type} is locked.')
try:
result = func(*args, **kwargs)
except Exception as e:
_LOG.exception("Error occurred: %s", str(e))
from syndicate.core import PROJECT_STATE
PROJECT_STATE.release_lock(lock_type)
sync_project_state()
raise
PROJECT_STATE.release_lock(lock_type)
sync_project_state()
return result
return wrapper
return real_wrapper
def timeit(action_name=None):
def internal_timeit(func):
@wraps(func)
def timed(*args, **kwargs):
ts = time()
result = func(*args, **kwargs)
te = time()
_LOG.info('Stage %s, elapsed time: %s', func.__name__,
str(timedelta(seconds=te - ts)))
result_action_name = result.get('operation') if \
isinstance(result, dict) else None
if result_action_name:
result = result.get('return_code', OK_RETURN_CODE)
if action_name:
username = getpass.getuser()
duration = round(te - ts, 3)
start_date_formatted = datetime.fromtimestamp(ts) \
.strftime(DATE_FORMAT_ISO_8601)
end_date_formatted = datetime.fromtimestamp(te) \
.strftime(DATE_FORMAT_ISO_8601)
bundle_name = kwargs.get('bundle_name')
deploy_name = kwargs.get('deploy_name')
rollback_on_error = kwargs.get('rollback_on_error')
from syndicate.core import PROJECT_STATE
PROJECT_STATE.log_execution_event(
operation=result_action_name or action_name,
initiator=username,
bundle_name=bundle_name,
deploy_name=deploy_name,
time_start=start_date_formatted,
time_end=end_date_formatted,
duration_sec=duration,
status=result,
rollback_on_error=rollback_on_error
)
return result
return timed
return internal_timeit
def execute_parallel_tasks(*fns):
threads = []
for fn in fns:
t = Thread(target=fn)
t.start()
threads.append(t)
for t in threads:
t.join()
def handle_futures_progress_bar(futures):
kwargs = {
'total': len(futures),
'unit': 'nap',
'leave': True
}
for _ in tqdm(concurrent.futures.as_completed(futures), **kwargs):
pass
def string_to_camel_case(s: str):
temp = s.split('_')
res = temp[0] + ''.join(ele.title() for ele in temp[1:])
return str(res)
def string_to_upper_camel_case(s: str):
temp = s.split('_')
res = ''.join(ele.title() for ele in temp)
return str(res)
def dict_keys_to_camel_case(d: dict):
return _inner_dict_keys_to_camel_case(d, string_to_camel_case)
def dict_keys_to_upper_camel_case(d: dict):
return _inner_dict_keys_to_camel_case(d, string_to_upper_camel_case)
def _inner_dict_keys_to_camel_case(d: dict, case_formatter):
new_d = {}
for key, value in d.items():
if isinstance(value, (str, int, float)):
new_d[case_formatter(key)] = value
if isinstance(value, list):
new_list = []
for index, item in enumerate(value):
if isinstance(item, (str, int, float)):
new_list.append(item)
if isinstance(item, dict):
new_list.append(dict_keys_to_camel_case(item))
new_d[case_formatter(key)] = new_list
if isinstance(value, dict):
new_d[case_formatter(key)] = dict_keys_to_camel_case(value)
return new_d
def string_to_capitalized_camel_case(s: str):
temp = s.split('_')
res = ''.join(ele.capitalize() for ele in temp)
return res
def dict_keys_to_capitalized_camel_case(d: dict):
new_d = {}
for key, value in d.items():
if isinstance(value, (str, int)):
new_d[string_to_capitalized_camel_case(key)] = value
if isinstance(value, list):
new_list = []
for index, item in enumerate(value):
if isinstance(item, (str, int)):
new_list.append(item)
if isinstance(item, dict):
new_list.append(dict_keys_to_capitalized_camel_case(item))
new_d[string_to_capitalized_camel_case(key)] = new_list
if isinstance(value, dict):
new_d[string_to_capitalized_camel_case(key)] = \
dict_keys_to_capitalized_camel_case(value)
return new_d
class OrderedGroup(click.Group):
def __init__(self, name=None, commands=None, **attrs):
super(OrderedGroup, self).__init__(name, commands, **attrs)
self.commands = commands or collections.OrderedDict()
def list_commands(self, ctx):
return self.commands
class OptionRequiredIf(click.Option):
def __init__(self, *args, **kwargs):
self.required_if = kwargs.pop('required_if')
self.required_if_values = kwargs.pop('required_if_values', [])
if not self.required_if:
raise AssertionError("'required_if' param must be specified")
super().__init__(*args, **kwargs)
def handle_parse_result(self, ctx, opts, args):
is_current_present: bool = self.name in opts
if self.required_if_values:
is_required_ok: bool = \
opts.get(self.required_if) in self.required_if_values
message = (
f"Option '{self.name}' and '{self.required_if}' must be "
f"specified together, and '{self.required_if}' must have one "
f"of the next values {self.required_if_values}")
else:
is_required_ok: bool = self.required_if in opts
message = (f"options: '{self.name}' and '{self.required_if}' "
f"must be specified together")
if is_current_present ^ is_required_ok:
raise click.UsageError(message)
else:
return super().handle_parse_result(ctx, opts, args)
class ValidRegionParamType(click.types.StringParamType):
ALL_VALUE = 'ALL'
name = 'region'
def __init__(self, allowed_all=False):
self.allowed_all = allowed_all
def convert(self, value, param, ctx):
value = super().convert(value, param, ctx)
if self.allowed_all and value.upper() == self.ALL_VALUE:
_LOG.info("The value is 'ALL' and 'allowed_all=True', returning..")
return value.lower()
_LOG.info(f"Checking whether {value} is a valid region...")
if value not in ALL_REGIONS:
_LOG.error(f"Invalid region '{value}' was given")
self.fail(f"Value '{value}' is not a valid region. Try one of "
f"these: {ALL_REGIONS}", param, ctx)
_LOG.info(f"Value '{value}' is a valid region, returning..")
return value
def get_metavar(self, param):
shorten_regions = [ALL_REGIONS[0], "...", ALL_REGIONS[-1]]
if self.allowed_all:
shorten_regions.insert(0, self.ALL_VALUE)
return f"[{'|'.join(shorten_regions)}]"
class DictParamType(click.types.StringParamType):
name = 'dict'
ITEMS_SEPARATOR = ','
KEY_VALUE_SEPARATOR = ':'
def convert(self, value, param, ctx):
value = super().convert(value, param, ctx)
_LOG.info(f'Stripping {value} from "{self.ITEMS_SEPARATOR}" a bit..')
value = value[1:] if value.startswith(self.ITEMS_SEPARATOR) else value
value = value[:-1] if value.endswith(self.ITEMS_SEPARATOR) else value
result = {}
_LOG.info(f'Converting: {value} to dict..')
try:
for item in value.split(self.ITEMS_SEPARATOR):
k, v = item.split(self.KEY_VALUE_SEPARATOR)
result[k.lstrip().rstrip()] = v.lstrip().rstrip()
except ValueError as e:
raise BadParameter(
f'Wrong format: "{value}". '
f'Must be "key:value" or "key1:value1,key2:value2". '
f'\nError: {e.__str__()}')
_LOG.info(f'Converted to such a dict: {result}')
return result
def get_metavar(self, param):
return f'KEY1{self.KEY_VALUE_SEPARATOR}VALUE1' \
f'{self.ITEMS_SEPARATOR}KEY2{self.KEY_VALUE_SEPARATOR}VALUE2'
class DeepDictParamType(click.types.StringParamType):
name = 'deep-dict'
ITEMS_SEPARATOR = ','
SUB_KEY_VALUE_SEPARATOR = ':'
MAIN_KEY_VALUE_SEPARATOR = ';'
def convert(
self,
value: str,
param: click.Parameter | None,
ctx: click.Context | None,
) -> dict[str, dict[str, str]]:
"""
Convert a string representation of nested key-value pairs into a
dictionary of dictionaries
**Example**::
Input: 'volume;Name:DBStorage01,Environment:Prod'
Output: {'volume': {'Name': 'DBStorage01', 'Environment': 'Prod'}}
"""
value = super().convert(value, param, ctx)
result = {}
try:
main_parts = value.split(self.MAIN_KEY_VALUE_SEPARATOR)
if len(main_parts) != 2:
raise ValueError(
"Expected exactly one main key-value separator (';')"
)
main_key = main_parts[0].strip()
sub_items = main_parts[1].strip()
sub_dict = {}
for item in sub_items.split(self.ITEMS_SEPARATOR):
sub_parts = item.split(self.SUB_KEY_VALUE_SEPARATOR)
if len(sub_parts) != 2:
raise ValueError(
"Expected exactly one sub key-value separator (':')"
)
sub_key = sub_parts[0].strip()
sub_value = sub_parts[1].strip()
sub_dict[sub_key] = sub_value
result[main_key] = sub_dict
except ValueError as e:
raise BadParameter(
f'Wrong format: "{value}". Expected format is: '
f'"main_key1;sub_key1:val1,sub_key2:val2" or similar. '
f'\nError: {str(e)}'
)
return result
def get_metavar(self, param) -> str:
return (
f'MAIN_KEY1{self.MAIN_KEY_VALUE_SEPARATOR}'
f'SUB_KEY1{self.SUB_KEY_VALUE_SEPARATOR}VALUE1{self.ITEMS_SEPARATOR}'
f'SUB_KEY2{self.SUB_KEY_VALUE_SEPARATOR}VALUE2'
)
def check_bundle_bucket_name(ctx, param, value):
try:
from syndicate.core.resources.s3_resource import validate_bucket_name
bucket_name = value
if '/' in value:
bucket_name = value.split('/', 1)[0]
validate_bucket_name(bucket_name)
return value
except ValueError as e:
raise BadParameter(e.__str__())
def check_prefix(ctx, param, value):
extended_prefix = ctx.params.get('extended_prefix')
if value:
value = value.lower().strip()
if extended_prefix:
result = ConfigValidator.validate_extended_prefix(param.name,
value)
else:
result = ConfigValidator.validate_prefix_suffix(param.name, value)
if result:
raise BadParameter(result)
return value
def check_suffix(ctx, param, value):
if value:
value = value.lower().strip()
result = ConfigValidator.validate_prefix_suffix(param.name, value)
if result:
raise BadParameter(result)
return value
def resolve_project_path(ctx, param, value):
from syndicate.core import CONFIG
if not value:
USER_LOG.info(f"Parameter: '{param.name}' wasn't specified. "
f"Getting automatically")
value = CONFIG.project_path \
if CONFIG and CONFIG.project_path else os.getcwd()
USER_LOG.info(f"Path: '{value}' was assigned to the "
f"parameter: '{param.name}'")
return value
def check_lambda_name(value):
"""Validates lambda's name"""
_LOG.info(f"Validating lambda name: '{value}'")
invalid_character = re.search('[^0-9a-zA-Z\-\_]', value)
error = None
if not 3 <= len(value) <= 63:
error = f'lambda name \'{value}\' length must be between 3 and 63 characters'
elif invalid_character:
error = f'lambda name \'{value}\' contains invalid characters: ' \
f'{invalid_character.group()}'
elif value.startswith('-'):
error = f"lambda name '{value}' cannot start with '-'"
elif value.endswith('-'):
error = f"lambda name '{value}' cannot end with '-'"
if error:
_LOG.error(f"Lambda name validation error: {error}")
raise ValueError(error)
_LOG.info(f"Lambda name: '{value}' passed the validation")
def check_lambdas_names(ctx, param, value):
"""Applies lambda name validator for each lambda's name"""
for lambda_name in value:
try:
check_lambda_name(lambda_name)
except ValueError as e:
raise click.BadParameter(e.__str__(), ctx, param)
return value
def check_lambda_layer_name(ctr, param, value):
pattern = r'^[a-zA-Z0-9][a-zA-Z0-9-_]{0,63}$'
errors = []
if len(value) > 64:
errors.append('The length of lambda layer name must be less or equal '
'to 64 character')
if not value[0].isalpha():
errors.append('The first character of the lambda layer name must be '
'a letter')
if not re.match(pattern, value):
errors.append('The lambda layer name must contain only lowercase '
'letters, numbers, underscores and hyphens')
if errors:
raise BadParameter(f'The lambda layer name is invalid. Details:\n'
f'{errors}')
return value
def check_lambda_existence(ctr, param, value):
from syndicate.core import PROJECT_STATE
lambdas = PROJECT_STATE.lambdas
for lambda_name in value:
if lambda_name not in lambdas:
raise BadParameter(f'Lambda with name \'{lambda_name}\' not found. '
f'Please check the lambda name and try again')
return value
def handle_interruption(_num: SIGINT, _frame):
""" Meant to handle interruption signal, by releasing any given lock """
_naming, _lock_types = 'PROJECT_STATE', (MODIFICATION_LOCK, WARMUP_LOCK)
if _num == SIGINT:
from syndicate.core import PROJECT_STATE
_state = PROJECT_STATE
if isinstance(_state, ProjectState):
_locked_type = next((each for each in _lock_types
if not _state.is_lock_free(each)), None)
if _locked_type:
_LOG.warn(f'Releasing the project state lock {_locked_type},'
'due to user interruption.')
_state.release_lock(_locked_type)
sync_project_state()
sys.exit(_num)
def check_lambda_state_consistency(objected_lambdas: list,
subjected_lambdas: dict, runtime: str):
from syndicate.core.groups import RUNTIME
return next((True for each in objected_lambdas if subjected_lambdas.get(
each, {}).get(RUNTIME) == runtime), False)
def delete_none(_dict):
"""Delete None values recursively"""
if isinstance(_dict, dict):
for key, value in list(_dict.items()):
if isinstance(value, (list, dict, tuple, set)):
_dict[key] = delete_none(value)
elif value is None or key is None:
del _dict[key]
elif isinstance(_dict, (list, set, tuple)):
_dict = type(_dict)(
delete_none(item) for item in _dict if item is not None)
return _dict
def without_zip_ext(name: str) -> str:
_zip = '.zip'
if name.endswith(_zip):
name = name[:-len(_zip)]
return name
def zip_ext(name: str) -> str:
_zip = '.zip'
if not name.endswith(_zip):
name = name + _zip
return name
def is_zip_empty(zip_path):
"""Check if a ZIP file is empty"""
try:
with zipfile.ZipFile(zip_path, 'r') as zip_file:
contents = zip_file.namelist()
if not contents:
return True
else:
return False
except zipfile.BadZipFile:
_LOG.info('The file is not a zip file or it is corrupted.')
return True
except FileNotFoundError:
_LOG.info('The zip file does not exist.')
return True
def check_file_extension(ctx, param, value, extensions):
for extension in extensions:
if value is None:
return
if value.lower().endswith(extension):
return value
raise BadParameter(f'Only files with extensions {extensions} are '
f'supported.')
def validate_incompatible_options(ctx, param, value, incompatible_options):
if value:
conflict_options = [option for option in incompatible_options if
ctx.params.get(option)]
if conflict_options:
raise BadParameter(f'Parameter \'{param.name}\' is incompatible '
f'with {conflict_options}')
return value
def validate_authorizer_name_option(ctx, param, value):
if value:
authorization_type = ctx.params.get('authorization_type')
if not authorization_type:
raise BadParameter(f'Parameter \'{param.name}\' can\'t be used '
f'without \'authorization_type\' parameter')
if authorization_type != CUSTOM_AUTHORIZER_KEY:
raise BadParameter(f'Parameter \'{param.name}\' can\'t be used '
f'with \'authorization_type\' '
f'\'{authorization_type}\'')
return value
def validate_api_gw_path(ctx, param, value):
pattern = (r'^/[a-zA-Z0-9-._~+]+(?:(?:/*)?[a-zA-Z0-9-._~]*\{?[a-zA-Z0-9-.'
r'_~]+\}?)*$')
_LOG.debug(f"The parameter '--{param.name}' value is '{value}'")
if os.name == 'nt' and Path(value).is_absolute():
raise BadParameter(
f"Your terminal resolves the parameter '--{param.name}' value as "
f"a filesystem path. Please pass the parameter '--{param.name}' "
f"value without starting slash ('/')")
if not value.startswith('/'):
value = '/' + value
if not re.match(pattern, value):
raise BadParameter(
f"A valid API gateway path must begin with a '/' and can contain "
f"alphanumeric characters, hyphens, periods, underscores or "
"dynamic parameters wrapped in '{}'")
return value
def check_tags(ctx, param, value):
if value:
errors = validate_tags(param.name, value)
if errors:
raise BadParameter(errors)
return value
def validate_tags(key_name, tags_dict):
return ConfigValidator.validate_tags(key_name, tags_dict)
def set_debug_log_level(ctx, param, value):
if value:
loggers = [logging.getLogger(name) for name in
logging.root.manager.loggerDict if
name.startswith('syndicate') or
name.startswith('user-syndicate')]
console_formatter = logging.Formatter(LOG_FORMAT_FOR_CONSOLE)
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
for logger in loggers:
if not logger.isEnabledFor(logging.DEBUG):
logger.setLevel(logging.DEBUG)
if logger.name == 'syndicate':
logger.addHandler(console_handler)
_LOG.debug('The logs level was set to DEBUG')
def verbose_option(func):
@click.option('--verbose', '-v', is_flag=True,
callback=set_debug_log_level, expose_value=False,
is_eager=True, help="Enable logging verbose mode.")
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper