patch_scripts/3.3.0/patch_parent_scope.py (275 lines of code) (raw):

import argparse import json import logging import logging.config import sys import uuid from abc import ABC, abstractmethod from pathlib import Path from typing import Callable, List, Tuple, Generator, Optional from modular_sdk.commons.constants import ALLOWED_TENANT_PARENT_MAP_KEYS, \ ParentScope, COMPOUND_KEYS_SEPARATOR from modular_sdk.commons.time_helper import java_timestamp from modular_sdk.models.parent import Parent from modular_sdk.models.tenant import Tenant ACTION_DESTINATION = 'action' PATCH_ALL_SCOPE_ACTION = 'all_scope' PATCH_SPECIFIC_SCOPE_ACTION = 'specific_scope' QUESTION1 = 'Do you really want to patch the parent? Although it has scope ' \ 'SPECIFIC multiple tenants can be linked to it. If you proceed ' \ 'the patch only the linkage with {tenant} will remain. ' \ 'Others WILL be destroyed!' QUESTION2 = 'Parent has multiple clouds in its scope. A new parent for each ' \ 'cloud will be created. Do you agree?' def get_logger(): config = { 'version': 1, 'disable_existing_loggers': True } logging.config.dictConfig(config) logger = logging.getLogger() handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s')) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger class ColorLog: def __init__(self): self._log = get_logger() def info(self, msg): self._log.info(TermColor.green(msg)) def debug(self, msg): self._log.debug(TermColor.gray(msg)) def warning(self, msg): self._log.warning(TermColor.yellow(msg)) def error(self, msg): self._log.error(TermColor.red(msg)) _LOG = ColorLog() class TermColor: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' DEBUG = '\033[90m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' @classmethod def blue(cls, st: str) -> str: return f'{cls.OKBLUE}{st}{cls.ENDC}' @classmethod def cyan(cls, st: str) -> str: return f'{cls.OKCYAN}{st}{cls.ENDC}' @classmethod def green(cls, st: str) -> str: return f'{cls.OKGREEN}{st}{cls.ENDC}' @classmethod def yellow(cls, st: str) -> str: return f'{cls.WARNING}{st}{cls.ENDC}' @classmethod def red(cls, st: str) -> str: return f'{cls.FAIL}{st}{cls.ENDC}' @classmethod def gray(cls, st: str) -> str: return f'{cls.DEBUG}{st}{cls.DEBUG}' class ParentsCol: def __init__(self, filename: str): self._filename = Path(filename) def add(self, pid: str): if not self._filename.exists(): data = set() else: with open(self._filename, 'r') as file: data = set(json.load(file)) data.add(pid) with open(self._filename, 'w') as file: json.dump(list(data), file) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description='Parent scope patch entrypoint' ) sub_parsers = parser.add_subparsers( dest=ACTION_DESTINATION, required=True, help='Available actions' ) _ = sub_parsers.add_parser( PATCH_ALL_SCOPE_ACTION, help='Patch parents with scope ALL' ) patch_specific = sub_parsers.add_parser( PATCH_SPECIFIC_SCOPE_ACTION, help='Patch parents with type SPECIFIC' ) patch_specific.add_argument( '--tenant_names', nargs='+', required=True, type=str, help='Tenants to patch their specific scope' ) patch_specific.add_argument( '--types', nargs='+', required=False, type=str, default=[], choices=ALLOWED_TENANT_PARENT_MAP_KEYS ) return parser def query_yes_no(question, default="yes") -> bool: """Ask a yes/no question via raw_input() and return their answer. "question" is a string that is presented to the user. "default" is the presumed answer if the user just hits <Enter>. It must be "yes" (the default), "no" or None (meaning an answer is required of the user). The "answer" return value is True for "yes" or False for "no". """ valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == "": return valid[default] elif choice in valid: return valid[choice] else: sys.stdout.write( "Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") class ActionHandler(ABC): @abstractmethod def __call__(self, **kwargs): ... class PatchAllScope(ActionHandler): @staticmethod def divide_by_cloud(parent: Parent) -> Generator[Parent, None, None]: """ Creates a new parent with the same business logic :param parent: :return: """ meta = parent.meta.as_dict() clouds = meta.pop('clouds', None) or [] # they must be meta.pop('scope', None) # ALL for cloud in clouds: yield Parent( parent_id=str(uuid.uuid4()), customer_id=parent.customer_id, application_id=parent.application_id, type=parent.type, description=parent.description, meta=meta, is_deleted=False, creation_timestamp=int(java_timestamp()), type_scope=f'{parent.type}{COMPOUND_KEYS_SEPARATOR}{ParentScope.ALL}{COMPOUND_KEYS_SEPARATOR}{cloud}' # noqa ) def __call__(self): old_parents = ParentsCol('old_parents.json') new_parents = ParentsCol('new_parents.json') it = Parent.scan( rate_limit=1, filter_condition=(Parent.meta['scope'] == ParentScope.ALL.value) ) for parent in it: _LOG.info(f'Going to patch ' f'parent: {parent.parent_id}:{parent.type}') clouds = parent.meta.as_dict().get('clouds') or [] if not clouds: _LOG.info('Parent has no clouds in its scope. ' 'Patching with no clouds') parent.update(actions=[ Parent.type_scope.set( f'{parent.type}{COMPOUND_KEYS_SEPARATOR}{ParentScope.ALL}{COMPOUND_KEYS_SEPARATOR}') # noqa ]) elif len(clouds) == 1: _LOG.info('Parent has one cloud in its scope. ' 'Patching with one clouds') parent.update(actions=[ Parent.type_scope.set( f'{parent.type}{COMPOUND_KEYS_SEPARATOR}{ParentScope.ALL}{COMPOUND_KEYS_SEPARATOR}{clouds[0]}') # noqa ]) elif len(clouds) == 3: # all clouds _LOG.info(f'Parent contains all the ' f'clouds: {", ".join(clouds)}. Can be patched') parent.update(actions=[ Parent.type_scope.set( f'{parent.type}{COMPOUND_KEYS_SEPARATOR}{ParentScope.ALL}{COMPOUND_KEYS_SEPARATOR}') # noqa ]) else: # multiple clouds but not all if not query_yes_no(TermColor.blue(QUESTION2)): _LOG.info('Skipping patch') continue old_parents.add(parent.parent_id) _LOG.info( f'Parent with id {parent.parent_id} won`t be changed') for copy in self.divide_by_cloud(parent): _LOG.info( f'Creating a new parent with id {copy.parent_id}') copy.save() new_parents.add(copy.parent_id) _LOG.info(f'Parent {parent.parent_id} was patched') class PatchSpecificScope(ActionHandler): def __init__(self): self._parent_cache = {} @staticmethod def iter_tenant_parents(tenant: Tenant, types: List[str] ) -> Generator[str, None, None]: if not types: yield from filter( lambda x: bool(x), tenant.parent_map.as_dict().values() ) else: pid = tenant.parent_map.as_dict() for t in types: if pid.get(t): yield pid[t] def get_parent(self, pid: str) -> Optional[Parent]: if pid in self._parent_cache: return self._parent_cache[pid] parent = Parent.get_nullable(pid) if parent: self._parent_cache[pid] = parent return parent def __call__(self, tenant_names: Tuple[str, ...], types: List[str]): old_parents = ParentsCol('old_parents.json') new_parents = ParentsCol('new_parents.json') _LOG.info('Going to patch specific scopes') if not types: _LOG.warning('Concrete types are not provided. All the found ' 'types will be patched') for name in tenant_names: tenant = Tenant.get_nullable(name) if not tenant: _LOG.warning(f'{name} - not found. Skipping') continue _LOG.info(f'Going to patch tenant: {name}') for parent_id in self.iter_tenant_parents(tenant, types): _LOG.info(f'Going to patch parent: {parent_id} for ' f'tenant: {name}') parent = self.get_parent(parent_id) if not parent: _LOG.warning('Parent not found. Skipping') continue old_parents.add(parent_id) meta = parent.meta.as_dict() meta.pop('scope', None) meta.pop('clouds', None) copy = Parent( parent_id=str(uuid.uuid4()), customer_id=parent.customer_id, application_id=parent.application_id, type=parent.type, description=parent.description, meta=meta, is_deleted=False, creation_timestamp=int(java_timestamp()), type_scope=f'{parent.type}{COMPOUND_KEYS_SEPARATOR}{ParentScope.SPECIFIC}{COMPOUND_KEYS_SEPARATOR}{tenant.name}' ) new_parents.add(copy.parent_id) _LOG.info(f'A new specific parent: ' f'{copy.parent_id}:{copy.type} will be created') copy.save() def main(): parser = build_parser() arguments = parser.parse_args() action = getattr(arguments, 'action', None) # action to handler mapping = { PATCH_ALL_SCOPE_ACTION: PatchAllScope(), PATCH_SPECIFIC_SCOPE_ACTION: PatchSpecificScope() } # None is root action func: Callable = mapping.get(action) or ( lambda **kwargs: _LOG.error('No handler')) if hasattr(arguments, 'action'): delattr(arguments, 'action') func(**vars(arguments)) if __name__ == '__main__': main()