workflows/pipe-common/scripts/manage_sge_profiles.py (462 lines of code) (raw):
# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import os
import re
import shutil
import subprocess
import tarfile
import tempfile
from abc import ABCMeta, abstractmethod
from collections import namedtuple
import click
import datetime
import psutil
import sys
import time
from pipeline.api import PipelineAPI, APIError
from pipeline.log.logger import LocalLogger, RunLogger, TaskLogger, LevelLogger, ExplicitLogger, ResilientLogger
from pipeline.utils.path import mkdir
from pipeline.utils.ssh import LocalExecutor, LoggingExecutor, ExecutorError
from scripts.generate_sge_profiles import generate_sge_profiles, \
PROFILE_QUEUE_FORMAT, PROFILE_AUTOSCALING_FORMAT, PROFILE_QUEUE_PATTERN
Profile = namedtuple('Profile', 'name,path_queue,path_autoscaling')
PROFILE_NAME_REMOVAL_PATTERN = r'[^a-zA-Z0-9.]+'
GROUP_ADMIN = 'ROLE_ADMIN'
DEFAULT_ALLOWED_GROUPS = 'ROLE_ADMIN,ROLE_ADVANCED_USER'
class ProfileError(RuntimeError):
pass
class ProfileManager:
__metaclass__ = ABCMeta
@abstractmethod
def create(self, profile_name):
pass
@abstractmethod
def configure(self, profile_name):
pass
@abstractmethod
def restart(self, profile_name):
pass
@abstractmethod
def list(self):
pass
@abstractmethod
def export(self, profile_names, output_path):
pass
class ProfileManagerDecorator(ProfileManager):
def __init__(self, inner):
self._inner = inner
def create(self, profile_name):
self._inner.create(profile_name)
def configure(self, profile_name):
self._inner.configure(profile_name)
def restart(self, profile_name):
self._inner.restart(profile_name)
def list(self):
self._inner.list()
def export(self, profile_names, output_path):
self._inner.export(profile_names, output_path)
class ResilientProfileManager:
def __init__(self, inner, logger):
self._inner = inner
self._logger = logger
def __getattr__(self, name):
if not hasattr(self._inner, name):
return None
attr = getattr(self._inner, name)
if not callable(attr):
return attr
return self._wrap(attr)
def _wrap(self, attr):
@functools.wraps(attr)
def _wrapped_attr(*args, **kwargs):
try:
return attr(*args, **kwargs)
except KeyboardInterrupt:
self._logger.warning('Interrupted.')
except ProfileError:
exit(1)
except Exception:
self._logger.warning('Grid engine profiles management has failed.', trace=True)
exit(1)
return _wrapped_attr
class SecurityProfileManager:
def __init__(self, inner, api, logger, allowed_groups):
self._inner = inner
self._api = api
self._logger = logger
self._allowed_groups = allowed_groups
def __getattr__(self, name):
if not hasattr(self._inner, name):
return None
attr = getattr(self._inner, name)
if not callable(attr):
return attr
return self._wrap(attr)
def _wrap(self, attr):
@functools.wraps(attr)
def _wrapped_attr(*args, **kwargs):
self._logger.debug('Loading current user...')
user = self._api.load_current_user_efficiently() or {}
user_groups = list(user.get('groups') or [])
user_roles = [role.get('name') for role in user.get('roles') or []]
if not any(group in self._allowed_groups for group in user_groups + user_roles):
self._logger.error('Access denied. Only users with explicit rights have access to sge utility. '
'Please contact the support team for the access.')
raise ProfileError()
return attr(*args, **kwargs)
return _wrapped_attr
class BackupProfileManager(ProfileManagerDecorator):
def __init__(self, inner, logger, path):
super(BackupProfileManager, self).__init__(inner)
self._inner = inner
self._logger = logger
self._path = path
self._template = 'sge_export_{}.tar.gz'
self._datetime_format = "%Y_%m_%d_%H_%M_%S"
def create(self, profile_name):
self._inner.create(profile_name)
self._backup()
def configure(self, profile_name):
self._inner.configure(profile_name)
self._backup()
def export(self, profile_names, output_path):
self._inner.export(profile_names, output_path)
self._backup(output_path)
def _backup(self, output_path=None):
backup_path = self._backup_path()
if output_path:
self._logger.info('Copying to {}...'.format(backup_path))
mkdir(os.path.dirname(backup_path))
shutil.copyfile(output_path, backup_path)
else:
self._inner.export(profile_names=[], output_path=backup_path)
def _backup_path(self):
return os.path.join(self._path, self._template.format(datetime.datetime.now().strftime(self._datetime_format)))
class GridEngineProfileManager(ProfileManager):
def __init__(self, executor, logger, logger_warning, cap_scripts_dir, queue_profile_regexp,
autoscaling_script_path):
self._executor = executor
self._logger = logger
self._logger_warning = logger_warning
self._cap_scripts_dir = cap_scripts_dir
self._queue_profile_regexp = queue_profile_regexp
self._autoscaling_script_path = autoscaling_script_path
def create(self, profile_name):
self._logger.info('Initiating grid engine profiles creation...')
editor = self._find_editor()
profile_name = self._preprocess_profile_name(profile_name)
profile = self._find_profile(self._cap_scripts_dir, self._queue_profile_regexp, profile_name)
if profile:
self._logger.warning('Grid engine {} profile already exists.'.format(profile_name))
raise ProfileError()
self._generate_profile(profile_name)
profile = self._find_profile(self._cap_scripts_dir, self._queue_profile_regexp, profile_name)
if not profile:
self._logger.warning('Grid engine {} profile does not exist.'.format(profile_name))
raise ProfileError()
self._create_queue(profile)
modified_profile = self._configure_profile(profile, editor)
verified = True
if modified_profile:
verified = self._verify_profile(modified_profile, self._autoscaling_script_path)
if verified:
self._persist_profile(modified_profile, profile)
self._launch_autoscaler(profile, self._autoscaling_script_path)
if not verified:
raise ProfileError()
def configure(self, profile_name):
self._logger.info('Initiating grid engine profile configuration...')
editor = self._find_editor()
profile = self._find_profile(self._cap_scripts_dir, self._queue_profile_regexp, profile_name)
if not profile:
self._logger.warning('Grid engine {} profile does not exist.'.format(profile_name))
raise ProfileError()
modified_profile = self._configure_profile(profile, editor)
verified = True
if modified_profile:
self._stop_autoscaler(profile, self._autoscaling_script_path)
verified = self._verify_profile(modified_profile, self._autoscaling_script_path)
if verified:
self._persist_profile(modified_profile, profile)
self._launch_autoscaler(profile, self._autoscaling_script_path)
if not verified:
raise ProfileError()
def restart(self, profile_name):
self._logger.info('Initiating grid engine profile restart...')
profile = self._find_profile(self._cap_scripts_dir, self._queue_profile_regexp, profile_name)
if not profile:
self._logger.warning('Grid engine {} profile does not exist.'.format(profile_name))
raise ProfileError()
self._stop_autoscaler(profile, self._autoscaling_script_path)
self._launch_autoscaler(profile, self._autoscaling_script_path)
def list(self):
self._logger.info('Initiating grid engine profiles listing...')
profiles = self._collect_profiles(self._cap_scripts_dir, self._queue_profile_regexp)
for profile in profiles:
self._logger.info('Grid engine profile has been found: {}'.format(profile.name))
def export(self, profile_names, output_path):
self._logger.info('Initiating grid engine profiles export...')
if profile_names:
profiles = list(self._get_profiles(profile_names))
else:
profiles = list(self._collect_profiles(self._cap_scripts_dir, self._queue_profile_regexp))
tmp_dir = tempfile.mkdtemp()
self._export(profiles, tmp_dir)
self._archive(tmp_dir, output_path)
def _export(self, profiles, export_dir):
self._logger.info('Exporting to {}...'.format(export_dir))
self._export_queues(export_dir)
self._export_pes(export_dir)
self._export_profiles(profiles, export_dir)
def _export_queues(self, export_dir):
self._logger.info('Exporting grid engine queues...')
output_dir = os.path.join(export_dir, 'cqueues')
mkdir(output_dir)
self._executor.execute("""
for name in $(qconf -sql); do
qconf -sq "$name" > "{output_dir}/$name"
done
""".format(output_dir=output_dir))
def _export_pes(self, export_dir):
self._logger.info('Exporting grid engine pes...')
output_dir = os.path.join(export_dir, 'pe')
mkdir(output_dir)
self._executor.execute("""
for name in $(qconf -spl); do
qconf -sp "$name" > "{output_dir}/$name"
done
""".format(output_dir=output_dir))
def _export_profiles(self, profiles, export_dir):
output_dir = os.path.join(export_dir, 'profiles')
mkdir(output_dir)
for profile in profiles:
self._logger.info('Exporting grid engine {} profile...'.format(profile.name))
path_queue = os.path.join(output_dir, os.path.basename(profile.path_queue))
path_autoscaling = os.path.join(output_dir, os.path.basename(profile.path_autoscaling))
shutil.copyfile(profile.path_queue, path_queue)
shutil.copyfile(profile.path_autoscaling, path_autoscaling)
def _archive(self, export_dir, output_path):
output_path = os.path.abspath(output_path)
self._logger.info('Archiving to {}...'.format(output_path))
mkdir(os.path.dirname(output_path))
with tarfile.open(output_path, 'w:gz') as tar:
for item in os.listdir(export_dir):
item_path = os.path.join(export_dir, item)
tar.add(item_path, arcname=os.path.basename(item_path))
def _find_editor(self):
self._logger.debug('Searching for editor...')
default_editor = os.getenv('VISUAL', os.getenv('EDITOR'))
fallback_editors = ['nano', 'vi', 'vim']
editors = [default_editor] + fallback_editors if default_editor else fallback_editors
editor = None
for potential_editor in editors:
try:
subprocess.check_output('command -v ' + potential_editor, shell=True, stderr=subprocess.STDOUT)
editor = potential_editor
self._logger.debug('Editor {} has been found.'.format(editor))
break
except subprocess.CalledProcessError as e:
self._logger.debug('Editor {} has not been found because {}.'
.format(potential_editor, e.output or 'the tool is not installed'))
if not editor:
self._logger.warning('Grid engine profiles configuration requires a text editor to be installed locally.\n'
'Please set VISUAL/EDITOR environment variable or install vi/vim/nano using one of the commands below.\n\n'
'yum install -y nano\n'
'apt-get install -y nano\n\n')
raise ProfileError()
return editor
def _preprocess_profile_name(self, profile_name):
self._logger.debug('Preprocessing profile name...')
profile_name = re.sub(PROFILE_NAME_REMOVAL_PATTERN, '', profile_name.lower())
if not profile_name:
self._logger.warning('Grid engine profile name should consist only of alphanumeric characters and dots.')
raise ProfileError()
if not profile_name.endswith('.q'):
profile_name = profile_name + '.q'
return profile_name
def _find_profile(self, cap_scripts_dir, queue_profile_regexp, profile_name):
profiles = self._collect_profiles(cap_scripts_dir, queue_profile_regexp)
for profile in profiles:
if profile.name == profile_name:
return profile
def _get_profiles(self, profile_names):
for profile_name in profile_names:
profile = self._find_profile(self._cap_scripts_dir, self._queue_profile_regexp, profile_name)
if not profile:
self._logger.warning('Grid engine {} profile does not exist.'.format(profile_name))
raise ProfileError()
yield profile
def _collect_profiles(self, cap_scripts_dir, profile_regexp):
self._logger.debug('Collecting existing profiles...')
for profile_name in os.listdir(cap_scripts_dir):
profile_match = profile_regexp.match(profile_name)
if not profile_match:
continue
queue_name = profile_match.group(1)
self._logger.debug('Profile {} has been collected.'.format(queue_name))
yield Profile(name=queue_name,
path_queue=os.path.join(cap_scripts_dir, PROFILE_QUEUE_FORMAT.format(queue_name)),
path_autoscaling=os.path.join(cap_scripts_dir, PROFILE_AUTOSCALING_FORMAT.format(queue_name)))
def _generate_profile(self, profile_name):
profile_index = str(int(time.time()))
self._logger.debug('Creating grid engine {} profile...'.format(profile_name))
os.environ['CP_CAP_SGE_QUEUE_NAME_{}'.format(profile_index)] = profile_name
generate_sge_profiles()
self._logger.info('Grid engine {} profile has been created.'.format(profile_name))
def _configure_profile(self, profile, editor):
tmp_profile = Profile(name=profile.name,
path_queue=tempfile.mktemp(),
path_autoscaling=tempfile.mktemp())
self._logger.debug('Copying grid engine {} profile to {}...'.format(profile.name, tmp_profile.path_autoscaling))
shutil.copy2(profile.path_autoscaling, tmp_profile.path_autoscaling)
shutil.copy2(profile.path_queue, tmp_profile.path_queue)
self._replace_in_file(tmp_profile.path_queue, profile.path_autoscaling, tmp_profile.path_autoscaling)
self._logger.debug('Modifying temporary grid engine {} profile...'.format(profile.name))
subprocess.check_call([editor, tmp_profile.path_autoscaling])
self._logger.debug('Extracting changes from grid engine {} profile...'.format(profile.name))
profile_changes = self._compare_file(profile.path_autoscaling, tmp_profile.path_autoscaling)
if not profile_changes:
self._logger.info('Grid engine {} profile has not been changed.'.format(profile.name))
return None
self._logger.info('Grid engine {} profile has been changed:\n{}'.format(profile.name, profile_changes))
return tmp_profile
def _replace_in_file(self, file_path, before, after):
with open(file_path, 'r') as file:
content = file.read()
updated_content = re.sub(re.escape(before), re.escape(after), content)
with open(file_path, 'w') as file:
file.write(updated_content)
def _compare_file(self, before_path, after_path):
try:
return subprocess.check_output(['diff', before_path, after_path], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
return e.output
def _persist_profile(self, modified_profile, profile):
self._logger.debug('Persisting grid engine {} profile changes...'.format(profile.name))
shutil.move(modified_profile.path_autoscaling, profile.path_autoscaling)
def _create_queue(self, profile):
self._logger.debug('Creating grid engine {} queue...'.format(profile.name))
subprocess.check_call("""
source "{autoscaling_profile_path}"
sge_setup_queue
""".format(autoscaling_profile_path=profile.path_queue),
shell=True)
self._logger.info('Grid engine {} queue has been created.'.format(profile.name))
def _stop_autoscaler(self, profile, autoscaling_script_path):
self._logger.debug('Searching for {} autoscaler processes...'.format(profile.name))
for proc in self._get_processes(autoscaling_script_path, CP_CAP_SGE_QUEUE_NAME=profile.name):
self._logger.debug('Stopping process #{}...'.format(proc.pid))
proc.terminate()
def _get_processes(self, *args, **kwargs):
for proc in psutil.process_iter():
try:
proc_cmdline = proc.cmdline()
proc_environ = proc.environ()
except Exception:
self._logger.error('Please use root account to configure grid engine profiles.')
raise ProfileError()
if all(arg in proc_cmdline for arg in args) \
and all(proc_environ.get(k) == v for k, v in kwargs.items()):
yield proc
def _verify_profile(self, profile, autoscaling_script_path):
self._logger.debug('Verifying grid engine queue {} autoscaling...'.format(profile.name))
try:
self._executor.execute("""
export CP_CAP_AUTOSCALE_LOGGING_LEVEL_RUN="ERROR"
export CP_CAP_AUTOSCALE_LOGGING_LEVEL_FILE="ERROR"
export CP_CAP_AUTOSCALE_LOGGING_LEVEL_CONSOLE="INFO"
export CP_CAP_AUTOSCALE_LOGGING_FORMAT="%(message)s"
export CP_CAP_AUTOSCALE_DRY_INIT="true"
source "{autoscaling_profile_path}"
"$CP_PYTHON2_PATH" "{autoscaling_script_path}"
""".format(autoscaling_profile_path=profile.path_queue,
autoscaling_script_path=autoscaling_script_path),
logger=self._logger_warning)
self._logger.debug('Grid engine {} autoscaling has been verified.'.format(profile.name))
return True
except ExecutorError:
self._logger.warning('Grid engine profile verification has failed. Reverting the changes...')
return False
def _launch_autoscaler(self, profile, autoscaling_script_path):
self._logger.debug('Launching grid engine queue {} autoscaling...'.format(profile.name))
self._executor.execute("""
source "{autoscaling_profile_path}"
nohup "$CP_PYTHON2_PATH" "{autoscaling_script_path}" >"$LOG_DIR/.nohup.autoscaler.sge.$CP_CAP_SGE_QUEUE_NAME.log" 2>&1 &
""".format(autoscaling_profile_path=profile.path_queue,
autoscaling_script_path=autoscaling_script_path))
self._logger.info('Grid engine {} autoscaling has been launched.'.format(profile.name))
def _get_manager():
logging_level_run = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_LEVEL_RUN', default='INFO')
logging_level_file = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_LEVEL_FILE', default='DEBUG')
logging_level_console = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_LEVEL_CONSOLE', default='INFO')
logging_format_file = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_FORMAT_FILE', default='%(asctime)s [%(levelname)s] %(message)s')
logging_format_console = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_FORMAT_CONSOLE', default='%(message)s')
logging_task = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_TASK', default='SGEProfiles')
logging_dir = os.getenv('CP_CAP_SGE_UTILITY_LOG_DIR', default=os.getenv('LOG_DIR', '/var/log'))
logging_file = os.getenv('CP_CAP_SGE_UTILITY_LOGGING_FILE', default='sge.profiles.log')
backup_path = os.getenv('CP_CAP_SGE_UTILITY_BACKUP_PATH', default=os.path.join(os.getenv('HOME'), '.pipe/backups'))
api_url = os.environ['API']
run_id = os.environ['RUN_ID']
runs_root = os.getenv('CP_RUNS_ROOT_DIR', default='/runs')
pipeline_name = os.getenv('PIPELINE_NAME', default='DefaultPipeline')
run_dir = os.getenv('RUN_DIR', default=os.path.join(runs_root, pipeline_name + '-' + run_id))
common_repo_dir = os.getenv('COMMON_REPO_DIR', default=os.path.join(run_dir, 'CommonRepo'))
cap_scripts_dir = os.getenv('CP_CAP_SCRIPTS_DIR', default='/common/cap_scripts')
autoscaling_script_path = os.path.join(common_repo_dir, 'scripts', 'autoscale_grid_engine.py')
queue_profile_regexp = re.compile(PROFILE_QUEUE_PATTERN)
logging_logger_root = logging.getLogger()
logging_logger_root.setLevel(logging.WARNING)
logging_logger = logging.getLogger(name=logging_task)
logging_logger.setLevel(logging.DEBUG)
if not logging_logger.handlers:
console_formatter = logging.Formatter(logging_format_console)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging_level_console)
console_handler.setFormatter(console_formatter)
logging_logger.addHandler(console_handler)
file_formatter = logging.Formatter(logging_format_file)
file_handler = logging.FileHandler(os.path.join(logging_dir, logging_file))
file_handler.setLevel(logging_level_file)
file_handler.setFormatter(file_formatter)
logging_logger.addHandler(file_handler)
api = PipelineAPI(api_url=api_url, log_dir=logging_dir)
logger = RunLogger(api=api, run_id=run_id)
logger = TaskLogger(task=logging_task, inner=logger)
logger = LevelLogger(level=logging_level_run, inner=logger)
logger = LocalLogger(logger=logging_logger, inner=logger)
logger = ResilientLogger(inner=logger, fallback=LocalLogger(logger=logging_logger))
logger_warning = ExplicitLogger(level='WARNING', inner=logger)
allowed_groups = _resolve_allowed_groups(api)
executor = LocalExecutor()
executor = LoggingExecutor(logger=logger, inner=executor)
manager = GridEngineProfileManager(executor=executor, logger=logger, logger_warning=logger_warning,
cap_scripts_dir=cap_scripts_dir,
queue_profile_regexp=queue_profile_regexp,
autoscaling_script_path=autoscaling_script_path)
if backup_path:
manager = BackupProfileManager(inner=manager, logger=logger, path=backup_path)
if allowed_groups:
manager = SecurityProfileManager(inner=manager, api=api, logger=logger, allowed_groups=allowed_groups)
manager = ResilientProfileManager(inner=manager, logger=logger)
return manager
def _resolve_allowed_groups(api):
try:
allowed_groups_str = api.get_preference_value('ge.utility.allowed.groups') or DEFAULT_ALLOWED_GROUPS
except APIError:
allowed_groups_str = DEFAULT_ALLOWED_GROUPS
allowed_groups = set(group.strip() for group in allowed_groups_str.split(',') if group.strip())
allowed_groups.add(GROUP_ADMIN)
return allowed_groups
@click.group()
def cli():
"""
Grid engine profiles management utility.
It allows to interactively manage grid engine profiles (queues).
Examples:
I. Create new grid engine profile
sge create queue.q
II. Configure an existing grid engine profile
sge configure queue.q
III. Restart an existing grid engine profile
sge restart queue.q
IV. List all existing grid engine profiles
sge list
V. Export all existing grid engine profiles
sge export sge.export.tar.gz
VI. Export certain existing grid engine profiles
sge export queue1.q sge.export.tar.gz
sge export queue1.q queue2.q sge.export.tar.gz
"""
pass
@cli.command()
@click.argument('name', required=True, type=str)
def create(name):
"""
Creates profiles.
It creates a new grid engine profile (queue) and provides a text editor to configure it.
Depending on the value of CP_CAP_AUTOSCALE parameter (true/false)
the corresponding queue's autoscaler may be started.
Examples:
sge create queue.q
"""
manager = _get_manager()
manager.create(profile_name=name)
@cli.command()
@click.argument('name', required=True, type=str)
def configure(name):
"""
Configures profiles.
It provides a text editor to configure an existing grid engine profile (queue).
Depending on the value of CP_CAP_AUTOSCALE parameter (true/false)
the corresponding queue's autoscaler may be started/restarted/stopped.
Examples:
sge configure queue.q
"""
manager = _get_manager()
manager.configure(profile_name=name)
@cli.command()
@click.argument('name', required=True, type=str)
def restart(name):
"""
Restarts profiles.
Depending on the value of CP_CAP_AUTOSCALE parameter (true/false)
the corresponding queue's autoscaler may be restarted.
Examples:
sge restart queue.q
"""
manager = _get_manager()
manager.restart(profile_name=name)
@cli.command('list')
def ls():
"""
Lists profiles.
It lists all existing grid engine profiles (queues).
Examples:
sge list
"""
manager = _get_manager()
manager.list()
@cli.command()
@click.argument('names', required=False, type=str, nargs=-1)
@click.argument('output', required=True, type=str)
def export(names, output):
"""
Export profiles.
It exports either all or certain existing grid engine profiles (queues) to a single tar gz file.
Examples:
sge export sge.export.tar.gz
sge export queue1.q sge.export.tar.gz
sge export queue1.q queue2.q sge.export.tar.gz
"""
manager = _get_manager()
manager.export(profile_names=names, output_path=output)
if __name__ == '__main__':
cli(sys.argv[1:])