syndicate/core/project_state/project_state.py (462 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import getpass import os import re import sys import time from datetime import datetime, timedelta from pathlib import Path, PurePath from typing import Union import yaml from syndicate.commons.log_helper import get_logger from syndicate.core.constants import BUILD_ACTION, \ DEPLOY_ACTION, UPDATE_ACTION, CLEAN_ACTION, PACKAGE_META_ACTION, \ ABORTED_STATUS, SUCCEEDED_STATUS, FAILED_STATUS, FAILED_RETURN_CODE, \ OK_RETURN_CODE, ABORTED_RETURN_CODE, MODIFICATION_OPS from syndicate.core.constants import DATE_FORMAT_ISO_8601 from syndicate.core.groups import RUNTIME_JAVA, RUNTIME_NODEJS, RUNTIME_PYTHON, \ RUNTIME_SWAGGER_UI, RUNTIME_DOTNET, RUNTIME_APPSYNC CAPITAL_LETTER_REGEX = '[A-Z][^A-Z]*' STATE_NAME = 'name' STATE_LOCKS = 'locks' STATE_LAMBDAS = 'lambdas' STATE_BUILD_PROJECT_MAPPING = 'build_projects_mapping' STATE_LOG_EVENTS = 'events' STATE_LATEST_DEPLOY = 'latest_deploy' LOCK_LOCKED_TILL = 'locked_till' LOCK_IS_LOCKED = 'is_locked' LOCK_LAST_MODIFICATION_DATE = 'last_modification_date' LOCK_INITIATOR = 'initiator' MODIFICATION_LOCK = 'modification_lock' WARMUP_LOCK = 'warm_up_lock' PROJECT_STATE_FILE = '.syndicate' INIT_FILE = '__init__.py' INDEX_FILE = 'index.js' BUILD_MAPPINGS = { RUNTIME_JAVA: 'jsrc/main/java', RUNTIME_PYTHON: 'src', RUNTIME_NODEJS: 'app', RUNTIME_DOTNET: 'dnapp', RUNTIME_SWAGGER_UI: 'swagger_src', RUNTIME_APPSYNC: 'appsync_src' } OPERATION_LOCK_MAPPINGS = { 'deploy': MODIFICATION_LOCK } KEEP_EVENTS_DAYS = 30 LEAVE_LATEST_EVENTS = 20 _LOG = get_logger(__name__) class ProjectState: def __init__(self, project_path: str = None, dct: dict = None): """In case 'dct' param is given it will be assigned to the internal _dict variable instead of loading from a file. It comes handy when we need to get ProjectState object without loading from file. All the existing functionality remains unimpaired""" if not (project_path or dct): message = 'Either project_path or dct of both must be ' \ 'specified!' _LOG.error(message) raise AssertionError(message) if project_path: self.project_path = project_path self.state_path = os.path.join(project_path, PROJECT_STATE_FILE) self.dct = dct if dct else self.__load_project_state_file() self._current_deploy = None self._current_bundle = None @staticmethod def generate(project_path, project_name): project_state = dict(name=project_name) with open(os.path.join(project_path, PROJECT_STATE_FILE), 'w') as state_file: yaml.dump(project_state, state_file) return ProjectState(project_path=project_path) @staticmethod def check_if_project_state_exists(project_path): return os.path.exists(os.path.join(project_path, PROJECT_STATE_FILE)) @property def dct(self) -> dict: return self._dict @dct.setter def dct(self, dct: dict): self._dict = dct @staticmethod def get_remote() -> 'ProjectState': from syndicate.core import CONN, CONFIG bucket_name = CONFIG.deploy_target_bucket key_compound = PurePath( CONFIG.deploy_target_bucket_key_compound, PROJECT_STATE_FILE ).as_posix() s3 = CONN.s3() remote_project_state = s3.load_file_body(bucket_name=bucket_name, key=key_compound) remote_project_state = yaml.unsafe_load(remote_project_state) _LOG.info(f'Unsafely loaded project state file from S3 bucket. The ' f'retrieved object has type: ' f'{type(remote_project_state).__name__}') if isinstance(remote_project_state, dict): remote_project_state = ProjectState(dct=remote_project_state) _LOG.info(f'Made ProjectState object from the dict loaded from S3 ' f'bucket') else: # isinstance(remote_project_state, ProjectState): _LOG.warning(f'Loaded project state object is already instance of ' f'ProjectState. Likely .syndicate from the ' f'the bucket is obsolete. Rewriting...') return remote_project_state def save_to_remote(self, project_state_to_save: 'ProjectState' = None): dict_to_save = project_state_to_save.dct if \ project_state_to_save else self.dct from syndicate.core import CONN, CONFIG bucket_name = CONFIG.deploy_target_bucket key_compound = PurePath(CONFIG.deploy_target_bucket_key_compound, PROJECT_STATE_FILE).as_posix() s3 = CONN.s3() s3.put_object(file_obj=yaml.dump(dict_to_save, sort_keys=False), key=key_compound, bucket=bucket_name, content_type='application/x-yaml') def save(self): with open(self.state_path, 'w') as state_file: yaml.dump(self.dct, state_file, sort_keys=False) @property def name(self): return self.dct.get(STATE_NAME) @property def default_deploy_name(self): import re parts = [] if '_' in self.name: parts.extend(self.name.split('_')) if not parts: parts = re.findall(CAPITAL_LETTER_REGEX, self.name) if not parts: parts = [self.name] return '-'.join([_.lower() for _ in parts]) @name.setter def name(self, name): self.dct.update({STATE_NAME: name}) @property def locks(self): locks = self.dct.get(STATE_LOCKS) if not locks: locks = dict() self.dct.update({STATE_LOCKS: locks}) return locks @property def lambdas(self): lambdas = self.dct.get(STATE_LAMBDAS) if not lambdas: return dict() return lambdas @property def events(self): events = self.dct.get(STATE_LOG_EVENTS) if not events: events = [] self.dct.update({STATE_LOG_EVENTS: events}) return events @events.setter def events(self, events): self.dct.update({STATE_LOG_EVENTS: events}) @property def latest_deploy(self): latest_deploy = self.dct.get(STATE_LATEST_DEPLOY) if not latest_deploy: latest_deploy = {} self.dct.update({STATE_LATEST_DEPLOY: latest_deploy}) return latest_deploy @latest_deploy.setter def latest_deploy(self, latest_deploy): self.dct[STATE_LATEST_DEPLOY] = latest_deploy @property def current_deploy(self): return self._current_deploy @current_deploy.setter def current_deploy(self, current_deploy): self._current_deploy = current_deploy @property def current_bundle(self): return self._current_bundle @current_bundle.setter def current_bundle(self, current_bundle): self._current_bundle = current_bundle @property def latest_bundle_name(self): """Returns bundle_name from the one of the latest operations which can guarantee that the bundle is ready""" operations = [BUILD_ACTION, PACKAGE_META_ACTION] for event in self.events: if event.get('operation') in operations: bundle_name = event.get('bundle_name') if bundle_name: return bundle_name @property def latest_deployed_bundle_name(self): return self.latest_deploy.get('bundle_name') @property def latest_deployed_deploy_name(self): return self.latest_deploy.get('deploy_name') def _get_attribute_from_latest_operation(self, operation_name, attribute): events = self.events event = next((event for event in events if event.get('operation') == operation_name), None) if event: return event.get(attribute) @property def latest_modification(self): events = self.events latest = next((event for event in events if event.get('operation') in MODIFICATION_OPS), None) return latest def get_latest_deployed_or_updated_bundle( self, bundle_name=None, latest_if_not_found=False): """ Retrieve the latest deployed or updated bundle. If `bundle_name` is provided, it returns the latest event for that specific bundle. If `bundle_name` is None, it returns the latest event across all operations. :latest_if_not_found: - If True, the method will retry fetching the latest event without reference to the bundle name. """ if bundle_name: modification_ops = [DEPLOY_ACTION, UPDATE_ACTION, CLEAN_ACTION] else: modification_ops = [DEPLOY_ACTION, UPDATE_ACTION] filtered_events = ( event for event in self.events if self._is_event_matching(event, bundle_name, modification_ops) ) latest_event = next(filtered_events, None) if latest_event and latest_event.get('operation') == CLEAN_ACTION: # in case bundle was deleted manually but present in .syndicate return self.get_latest_deployed_or_updated_bundle() if not latest_event and latest_if_not_found: return self.get_latest_deployed_or_updated_bundle() return latest_event.get('bundle_name') if latest_event else None @staticmethod def _is_event_matching(event, bundle_name, modification_ops): matches_operation = event.get('operation') in modification_ops matches_bundle_name = bundle_name is None or event.get( 'bundle_name') == bundle_name status = event.get('status') != ABORTED_STATUS return matches_operation and matches_bundle_name and status def is_lock_free(self, lock_name=None, lock_config=None): lock = lock_config or self.locks.get(lock_name) if not lock: return True elif not lock.get(LOCK_IS_LOCKED): return True elif locked_till := lock.get(LOCK_LOCKED_TILL): locked_till_datetime = datetime.strptime( locked_till, DATE_FORMAT_ISO_8601) if locked_till_datetime <= datetime.utcnow(): lock[LOCK_LOCKED_TILL] = None lock[LOCK_IS_LOCKED] = False return True return False def acquire_lock(self, lock_name): self.__modify_lock_state(lock_name, True) def release_lock(self, lock_name): self.__modify_lock_state(lock_name, False) def actualize_locks(self, other_project_state): locks = self.locks other_locks = other_project_state.locks all_lock_names = set(locks.keys()).union(set(other_locks.keys())) for lock_name in all_lock_names: lock = locks.get(lock_name) other_lock = other_locks.get(lock_name) if lock is None: locks.update({lock_name: other_lock}) elif other_lock is None: other_locks.update({lock_name: lock}) elif (not self.is_lock_free(lock_config=other_lock) and getpass.getuser() != other_lock.get(LOCK_INITIATOR)): locks.update({lock_name: other_lock}) elif (lock.get(LOCK_LAST_MODIFICATION_DATE) < other_lock.get(LOCK_LAST_MODIFICATION_DATE)): locks.update({lock_name: other_lock}) else: other_locks.update({lock_name: lock}) def actualize_latest_deploy(self, other_project_state: 'ProjectState'): local_deploy = self.latest_deploy remote_deploy = other_project_state.latest_deploy if local_deploy and remote_deploy: local_time_start = datetime.strptime(local_deploy['time_start'], DATE_FORMAT_ISO_8601) remote_time_start = datetime.strptime(remote_deploy['time_start'], DATE_FORMAT_ISO_8601) if remote_time_start > local_time_start: self.latest_deploy = remote_deploy elif remote_deploy: self.latest_deploy = remote_deploy def refresh_lambda_state(self): """ Refreshes current Project lambda State, be resolving the compatibility with the retained state. Given any consistency conflict the ProjectState is re-persisted. :return: None """ from syndicate.core.generators.lambda_function import \ resolve_lambda_path from syndicate.core.helper import check_lambda_state_consistency _persistence_need: bool = False _project_path = Path(self.project_path) _stale_lambdas = self.lambdas for runtime, source in BUILD_MAPPINGS.items(): _path = resolve_lambda_path(_project_path, runtime, source) _updated = self._update_lambdas_from_path(_path, runtime) if not _persistence_need and check_lambda_state_consistency( objected_lambdas=_updated, subjected_lambdas=_stale_lambdas, runtime=runtime ): _persistence_need = True if _persistence_need: self.save() def _update_lambdas_from_path(self, path: Union[str, Path], runtime: str): """ Non persistently updates ProjectState runtime and any found lambdas from a given path. :parameter path:Path :parameter runtime: str :return: List """ try: path = path if isinstance(path, Path) else Path(path) except (TypeError, Exception): _LOG.error(f'Requested path {path} must be of str or Path type.') return [] _LOG.info(f'Going to resolve any lambda names from a given path: ' f'{path.absolute()}.') _lambdas: list = self._resolve_lambdas_from_path(path, runtime) for name in self._resolve_lambdas_from_path(path, runtime): _LOG.info(f'Going to add the following \'{runtime}\' lambda:' f'\'{name}\' to the pending ProjectState.') self.add_lambda(lambda_name=name, runtime=runtime) if _lambdas: self.add_project_build_mapping(runtime) return _lambdas def add_lambda(self, lambda_name, runtime): lambdas = self.dct.get(STATE_LAMBDAS) if not lambdas: lambdas = dict() self.dct.update({STATE_LAMBDAS: lambdas}) lambdas.update({lambda_name: {'runtime': runtime}}) def add_project_build_mapping(self, runtime): build_project_mappings = self.dct.get(STATE_BUILD_PROJECT_MAPPING) if not build_project_mappings: build_project_mappings = dict() self.dct.update( {STATE_BUILD_PROJECT_MAPPING: build_project_mappings}) build_mapping = BUILD_MAPPINGS.get(runtime) build_project_mappings.update({runtime: build_mapping}) def load_project_build_mapping(self): return self.dct.get(STATE_BUILD_PROJECT_MAPPING) def log_execution_event(self, **kwargs): operation = kwargs.get('operation') status = kwargs.get('status') rollback_on_error = kwargs.get('rollback_on_error') valid_statuses = { FAILED_RETURN_CODE, OK_RETURN_CODE, ABORTED_RETURN_CODE } if status not in valid_statuses: kwargs.pop('status', None) else: status = { OK_RETURN_CODE: True, FAILED_RETURN_CODE: False, ABORTED_RETURN_CODE: ABORTED_STATUS, None: None }.get(status) if operation == DEPLOY_ACTION: params = kwargs.copy() params.pop('operation', None) params.pop('status', None) params['is_succeeded'] = status if params['is_succeeded'] != ABORTED_STATUS: if not (status is False and rollback_on_error is True): params.pop('rollback_on_error') self._set_latest_deploy_info(**params) if operation == CLEAN_ACTION and status is True: self._delete_latest_deploy_info() match status: case True: kwargs['status'] = SUCCEEDED_STATUS case False: kwargs['status'] = FAILED_STATUS case status if status == ABORTED_STATUS: kwargs['status'] = ABORTED_STATUS kwargs = { key: value for key, value in kwargs.items() if value is not None } self.events.insert(0, kwargs) self.__save_events() def _set_latest_deploy_info(self, **kwargs): kwargs = { key: value for key, value in kwargs.items() if value is not None } self.latest_deploy = kwargs def _delete_latest_deploy_info(self): self.latest_deploy = {} remote_project_state = ProjectState.get_remote() remote_project_state.latest_deploy = {} self.save_to_remote(project_state_to_save=remote_project_state) def add_execution_events(self, events): for event in events: if event not in self.events: self.events.append(event) self.events.sort(key=lambda e: e.get('time_end'), reverse=True) self.__save_events() def __modify_lock_state(self, lock_name, locked): from syndicate.core import CONFIG locked_till = CONFIG.lock_lifetime_minutes locks = self.locks lock = locks.get(lock_name) modification_datetime = datetime.utcnow() timestamp = modification_datetime.strftime(DATE_FORMAT_ISO_8601) locked_till_timestamp = (modification_datetime + timedelta(minutes=locked_till)).strftime( DATE_FORMAT_ISO_8601) modified_lock = { LOCK_LAST_MODIFICATION_DATE: timestamp, LOCK_IS_LOCKED: locked, LOCK_INITIATOR: getpass.getuser() } if locked: modified_lock[LOCK_LOCKED_TILL] = locked_till_timestamp if lock: if not locked: lock.pop(LOCK_LOCKED_TILL, None) lock.update(modified_lock) else: locks.update({lock_name: modified_lock}) self.save() def __load_project_state_file(self): if not ProjectState.check_if_project_state_exists(self.project_path): raise AssertionError( f'There is no .syndicate file in {self.project_path}') with open(self.state_path) as state_file: return yaml.safe_load(state_file.read()) def __save_events(self): current_time = datetime.fromtimestamp(time.time()) index_out_days = None for i, event in enumerate(self.events): if (current_time - datetime.strptime(event.get('time_end'), DATE_FORMAT_ISO_8601)).days > KEEP_EVENTS_DAYS: index_out_days = i break if index_out_days: if index_out_days >= LEAVE_LATEST_EVENTS: self.events = self.events[:index_out_days] else: self.events = self.events[:LEAVE_LATEST_EVENTS] self.save() @staticmethod def build_from_structure(config): """Builds project state file from existing project folder in case of moving from older versions :type config: syndicate.core.conf.processor.ConfigHolder """ from syndicate.core.generators.lambda_function import \ resolve_lambda_path absolute_path = config.project_path project_path = Path(absolute_path) project_state = ProjectState.generate(project_path=absolute_path, project_name=project_path.name) for runtime, source_path in BUILD_MAPPINGS.items(): lambdas_path = resolve_lambda_path(project_path, runtime, source_path) if os.path.exists(lambdas_path): project_state.add_project_build_mapping(runtime) project_state._update_lambdas_from_path(lambdas_path, runtime) project_state.save() return project_state @staticmethod def _resolve_lambdas_from_path(path: Path, runtime: str): """ Resolves a list of names bound to lambda functions, retained inside a given path, based on a provided runtime. :parameter path: Path :parameter runtime: str :return: List[str] """ lambda_list = [] _java_lambda_regex = 'lambdaName\s*=\s*"(.+)"' if not path.exists(): return lambda_list for item in path.iterdir(): if runtime == RUNTIME_JAVA: if not item.is_file(): continue try: match = re.search(_java_lambda_regex, item.read_text()) if match: lambda_list.append(match.group(1)) except (OSError, Exception): print("Couldn't retrieve lambda name from the java " "lambda by path: {}".format(item.absolute()), file=sys.stderr) else: if (item/INIT_FILE).exists() or (item/INDEX_FILE).exists(): lambda_list.append(item.name) return lambda_list