workflows/pipe-common/pipeline/hpc/engine/sge.py (538 lines of code) (raw):
import functools
from xml.etree import ElementTree
import math
import operator
from datetime import datetime
from pipeline.hpc.cmd import ExecutionError
from pipeline.hpc.engine.gridengine import GridEngine, GridEngineJobState, GridEngineJob, AllocationRule, \
GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator, GridEngineLaunchAdapter, \
GridEngineJobProcessor
from pipeline.hpc.logger import Logger
from pipeline.hpc.resource import IntegralDemand, ResourceSupply, FractionalDemand, CustomResourceSupply, \
CustomResourceDemand
from pipeline.hpc.valid import WorkerValidatorHandler
class SunGridEngine(GridEngine):
_DELETE_HOST = 'qconf -de %s'
_SHOW_PE_ALLOCATION_RULE = 'qconf -sp %s | grep "^allocation_rule" | awk \'{print $2}\''
_REMOVE_HOST_FROM_HOST_GROUP = 'qconf -dattr hostgroup hostlist %s %s'
_REMOVE_HOST_FROM_QUEUE_SETTINGS = 'qconf -purge queue slots %s@%s'
_SHUTDOWN_HOST_EXECUTION_DAEMON = 'qconf -ke %s'
_REMOVE_HOST_FROM_ADMINISTRATIVE_HOSTS = 'qconf -dh %s'
_QSTAT = 'qstat -u "*" -r -f -xml'
_QHOST_RESOURCES = 'qhost -q -F -xml'
_QHOST_GLOBAL_RESOURCES = 'qhost -h "*" -F -xml'
_QSTAT_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
_QMOD_DISABLE = 'qmod -d %s@%s'
_QMOD_ENABLE = 'qmod -e %s@%s'
_SHOW_EXECUTION_HOST = 'qconf -se %s'
_KILL_JOBS = 'qdel %s'
_FORCE_KILL_JOBS = 'qdel -f %s'
def __init__(self, cmd_executor, queue, hostlist, queue_default,
gpu_resource_name, mem_resource_name, exc_resource_name):
self.cmd_executor = cmd_executor
self.queue = queue
self.hostlist = hostlist
self.queue_default = queue_default
self.tmp_queue_name_attribute = 'tmp_queue_name'
self.gpu_resource_name = gpu_resource_name
self.mem_resource_name = mem_resource_name
self.exc_resource_name = exc_resource_name
self.job_state_to_codes = {
GridEngineJobState.RUNNING: ['r', 't', 'Rr', 'Rt'],
GridEngineJobState.PENDING: ['qw', 'qw', 'hqw', 'hqw', 'hRwq', 'hRwq', 'hRwq', 'qw', 'qw'],
GridEngineJobState.SUSPENDED: ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', 'Rts', 'RS', 'RtS', 'RT', 'RtT'],
GridEngineJobState.ERROR: ['Eqw', 'Ehqw', 'EhRqw'],
GridEngineJobState.DELETED: ['dr', 'dt', 'dRr', 'dRt', 'ds', 'dS', 'dT', 'dRs', 'dRS', 'dRT'],
GridEngineJobState.COMPLETED: [],
GridEngineJobState.UNKNOWN: []
}
def get_engine_type(self):
return GridEngineType.SGE
def get_jobs(self):
try:
output = self.cmd_executor.execute(SunGridEngine._QSTAT)
except ExecutionError:
Logger.warn('Grid engine jobs listing has failed.')
return []
jobs = {}
root = ElementTree.fromstring(output)
running_jobs = []
queue_info = root.find('queue_info')
for queue_list in queue_info.findall('Queue-List'):
queue_name = queue_list.findtext('name')
queue_running_jobs = queue_list.findall('job_list')
for job_list in queue_running_jobs:
job_queue_name = ElementTree.SubElement(job_list, self.tmp_queue_name_attribute)
job_queue_name.text = queue_name
running_jobs.extend(queue_running_jobs)
job_info = root.find('job_info')
pending_jobs = job_info.findall('job_list')
for job_list in running_jobs + pending_jobs:
job_requested_queue = job_list.findtext('hard_req_queue')
job_actual_queue, job_host = self._parse_queue_and_host(job_list.findtext(self.tmp_queue_name_attribute))
if job_requested_queue and job_requested_queue != self.queue \
or job_actual_queue and job_actual_queue != self.queue:
# filter out a job with actual/requested queue specified
# if a configured queue is different from the job's one
continue
if not job_requested_queue and not job_actual_queue and not self.queue_default:
# filter out a job without actual/requested queue specified
# if a configured queue is not a default queue
continue
root_job_id = job_list.findtext('JB_job_number')
job_tasks = self._parse_array(job_list.findtext('tasks'))
job_ids = ['{}.{}'.format(root_job_id, job_task) for job_task in job_tasks] or [root_job_id]
job_name = job_list.findtext('JB_name')
job_user = job_list.findtext('JB_owner')
job_state = GridEngineJobState.from_letter_code(job_list.findtext('state'), self.job_state_to_codes)
job_datetime = self._parse_date(
job_list.findtext('JAT_start_time') or job_list.findtext('JB_submission_time'))
job_hosts = [job_host] if job_host else []
requested_pe = job_list.find('requested_pe')
job_pe = requested_pe.get('name') if requested_pe is not None else 'local'
job_cpu = int(requested_pe.text if requested_pe is not None else '1')
job_gpu = 0
job_mem = 0
job_exc = 0
job_requests = {}
hard_requests = job_list.findall('hard_request')
for request in hard_requests:
request_name = request.get('name', '').strip()
request_value = request.text or ''
if not request_name or not request_value:
Logger.warn('Job #{job_id} by {job_user} has partial requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name=request_name or '?', value=request_value or '?'))
continue
if request_name == self.gpu_resource_name:
try:
job_gpu = self._parse_int(request_value)
except ValueError:
Logger.warn('Job #{job_id} by {job_user} has invalid requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name='gpu', value=request_value),
trace=True)
elif request_name == self.mem_resource_name:
try:
job_mem = self._parse_mem(request_value)
except Exception:
Logger.warn('Job #{job_id} by {job_user} has invalid requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name='mem', value=request_value),
trace=True)
elif request_name == self.exc_resource_name:
try:
job_exc = int(self._parse_bool(request_value))
except Exception:
Logger.warn('Job #{job_id} by {job_user} has invalid requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name='exc', value=request_value),
trace=True)
else:
job_requests[request_name] = request_value
for job_id in job_ids:
if job_id in jobs:
job = jobs[job_id]
if job_host:
job.hosts.append(job_host)
else:
jobs[job_id] = GridEngineJob(
id=job_id,
root_id=root_job_id,
name=job_name,
user=job_user,
state=job_state,
datetime=job_datetime,
hosts=job_hosts,
cpu=job_cpu,
gpu=job_gpu,
mem=job_mem,
exc=job_exc,
requests=job_requests,
pe=job_pe
)
return jobs.values()
def _parse_int(self, value):
return int(float(value))
def _parse_bool(self, bool_request):
if not bool_request:
return False
if bool_request.strip().lower() in ['true', 'yes', 'on']:
return True
if bool_request.strip().lower() in ['false', 'no', 'off']:
return False
raise ValueError()
def _parse_date(self, date):
return datetime.strptime(date, SunGridEngine._QSTAT_DATETIME_FORMAT)
def _parse_queue_and_host(self, queue_and_host):
return queue_and_host.split('@')[:2] if queue_and_host else (None, None)
def _parse_array(self, array_jobs):
result = []
if not array_jobs:
return result
for interval in array_jobs.split(","):
if ':' in interval:
array_borders, _ = interval.split(':')
start, stop = array_borders.split('-')
result += list(range(int(start), int(stop) + 1))
else:
result += [int(interval)]
return result
def _parse_mem(self, mem_request):
if not mem_request:
return 0
modifiers = {
'k': 1000, 'm': 1000 ** 2, 'g': 1000 ** 3,
'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3
}
if mem_request[-1] in modifiers:
number = self._parse_int(mem_request[:-1])
modifier = modifiers[mem_request[-1]]
else:
number = self._parse_int(mem_request)
modifier = 1
size_in_bytes = number * modifier
size_in_gibibytes = int(math.ceil(size_in_bytes / modifiers['G']))
return size_in_gibibytes
def disable_host(self, host):
self.cmd_executor.execute(SunGridEngine._QMOD_DISABLE % (self.queue, host))
def enable_host(self, host):
self.cmd_executor.execute(SunGridEngine._QMOD_ENABLE % (self.queue, host))
def get_pe_allocation_rule(self, pe):
exec_result = self.cmd_executor.execute(SunGridEngine._SHOW_PE_ALLOCATION_RULE % pe)
return AllocationRule(exec_result.strip()) if exec_result else AllocationRule.pe_slots()
def delete_host(self, host, skip_on_failure=False):
self._shutdown_execution_host(host, skip_on_failure=skip_on_failure)
self._remove_host_from_queue_settings(host, self.queue, skip_on_failure=skip_on_failure)
self._remove_host_from_host_group(host, self.hostlist, skip_on_failure=skip_on_failure)
self._remove_host_from_administrative_hosts(host, skip_on_failure=skip_on_failure)
self._remove_host_from_grid_engine(host, skip_on_failure=skip_on_failure)
def get_global_supplies(self):
yield CustomResourceSupply(values=dict(self._get_global_resources()))
def _get_global_resources(self):
output = self.cmd_executor.execute(SunGridEngine._QHOST_GLOBAL_RESOURCES)
root = ElementTree.fromstring(output)
for host in root.findall('host'):
for resource in host.findall('resourcevalue'):
resource_name = resource.get('name', '').strip()
resource_value = resource.text or ''
if not resource_name or not resource_value:
Logger.warn('Global has partial resource: {name}={value}'
.format(name=resource_name or '?', value=resource_value or '?'))
continue
yield resource_name, resource_value
def get_host_supplies(self):
output = self.cmd_executor.execute(SunGridEngine._QHOST_RESOURCES)
root = ElementTree.fromstring(output)
for host in root.findall('host'):
host_name = host.get('name', '').strip()
host_gpu = 0
host_mem = 0
host_exc = 0
for resource in host.findall('resourcevalue'):
resource_name = resource.get('name', '').strip()
resource_value = resource.text or ''
if not resource_name or not resource_value:
Logger.warn('Host {host_name} has partial resource: {name}={value}'
.format(host_name=host_name, name=resource_name or '?', value=resource_value or '?'))
continue
if resource_name == self.gpu_resource_name:
try:
host_gpu = self._parse_int(resource_value)
except ValueError:
Logger.warn('Host {host_name} has invalid resource: {name}={value}'
.format(host_name=host_name, name='gpu', value=resource_value),
trace=True)
elif resource_name == self.mem_resource_name:
try:
host_mem = self._parse_mem(resource_value)
except Exception:
Logger.warn('Host {host_name} has invalid resource: {name}={value}'
.format(host_name=host_name, name='mem', value=resource_value),
trace=True)
elif resource_name == self.exc_resource_name:
try:
host_exc = self._parse_int(resource_value)
except Exception:
Logger.warn('Host {host_name} has invalid resource: {name}={value}'
.format(host_name=host_name, name='exc', value=resource_value),
trace=True)
for queue in host.findall('queue[@name=\'%s\']' % self.queue):
host_slots = int(queue.find('queuevalue[@name=\'slots\']').text or '0')
host_used = int(queue.find('queuevalue[@name=\'slots_used\']').text or '0')
host_resv = int(queue.find('queuevalue[@name=\'slots_resv\']').text or '0')
yield (ResourceSupply(cpu=host_slots, gpu=host_gpu, mem=host_mem, exc=host_exc)
- ResourceSupply(cpu=host_used + host_resv))
def get_host_supply(self, host):
for line in self.cmd_executor.execute_to_lines(SunGridEngine._SHOW_EXECUTION_HOST % host):
if "processors" in line:
return ResourceSupply(cpu=int(line.strip().split()[1]))
return ResourceSupply()
def _shutdown_execution_host(self, host, skip_on_failure):
_perform_command(
action=lambda: self.cmd_executor.execute(SunGridEngine._SHUTDOWN_HOST_EXECUTION_DAEMON % host),
msg='Shutdown GE host execution daemon.',
error_msg='Shutdown GE host execution daemon has failed.',
skip_on_failure=skip_on_failure
)
def _remove_host_from_queue_settings(self, host, queue, skip_on_failure):
_perform_command(
action=lambda: self.cmd_executor.execute(SunGridEngine._REMOVE_HOST_FROM_QUEUE_SETTINGS % (queue, host)),
msg='Remove host from queue settings.',
error_msg='Removing host from queue settings has failed.',
skip_on_failure=skip_on_failure
)
def _remove_host_from_host_group(self, host, hostgroup, skip_on_failure):
_perform_command(
action=lambda: self.cmd_executor.execute(SunGridEngine._REMOVE_HOST_FROM_HOST_GROUP % (host, hostgroup)),
msg='Remove host from host group.',
error_msg='Removing host from host group has failed.',
skip_on_failure=skip_on_failure
)
def _remove_host_from_grid_engine(self, host, skip_on_failure):
_perform_command(
action=lambda: self.cmd_executor.execute(SunGridEngine._DELETE_HOST % host),
msg='Remove host from GE.',
error_msg='Removing host from GE has failed.',
skip_on_failure=skip_on_failure
)
def _remove_host_from_administrative_hosts(self, host, skip_on_failure):
_perform_command(
action=lambda: self.cmd_executor.execute(SunGridEngine._REMOVE_HOST_FROM_ADMINISTRATIVE_HOSTS % host),
msg='Remove host from list of administrative hosts.',
error_msg='Removing host from list of administrative hosts has failed.',
skip_on_failure=skip_on_failure
)
def kill_jobs(self, jobs, force=False):
job_ids = [str(job.id) for job in jobs]
self.cmd_executor.execute((SunGridEngine._FORCE_KILL_JOBS if force else SunGridEngine._KILL_JOBS) % ' '.join(job_ids))
class SunGridEngineDefaultDemandSelector(GridEngineDemandSelector):
def __init__(self, grid_engine):
self.grid_engine = grid_engine
def select(self, jobs):
initial_supply = functools.reduce(operator.add, self.grid_engine.get_host_supplies(), ResourceSupply())
allocation_rules = {}
for job in sorted(jobs, key=lambda job: job.root_id):
allocation_rule = allocation_rules[job.pe] = allocation_rules.get(job.pe) \
or self.grid_engine.get_pe_allocation_rule(job.pe)
if allocation_rule in AllocationRule.fractional_rules():
initial_demand = FractionalDemand(cpu=job.cpu, gpu=job.gpu, mem=job.mem, exc=job.exc, owner=job.user)
remaining_demand, remaining_supply = initial_demand.subtract(initial_supply)
else:
initial_demand = IntegralDemand(cpu=job.cpu, gpu=job.gpu, mem=job.mem, exc=job.exc, owner=job.user)
remaining_demand, remaining_supply = initial_demand, initial_supply
if not remaining_demand:
Logger.warn('Ignoring job #{job_id} {job_name} by {job_user} because '
'it is pending even though '
'it requires resources which are available at the moment: '
'{job_resources}...'
.format(job_id=job.id, job_name=job.name, job_user=job.user,
job_resources=self._as_resources_str(initial_demand, initial_supply)))
continue
initial_supply = remaining_supply
yield remaining_demand
def _as_resources_str(self, demand, supply):
return ', '.join('{demand}/{supply} {name}'
.format(name=key,
demand=getattr(demand, key),
supply=getattr(supply, key))
for key in ['cpu', 'gpu', 'mem', 'exc'])
class SunGridEngineGlobalDemandSelector(GridEngineDemandSelector):
def __init__(self, inner, grid_engine):
self._inner = inner
self._grid_engine = grid_engine
def select(self, jobs):
return self._inner.select(list(self.filter(jobs)))
def filter(self, jobs):
initial_supplies = map(self._get_int_supply, self._grid_engine.get_global_supplies())
initial_supply = functools.reduce(operator.add, initial_supplies, CustomResourceSupply())
for job in sorted(jobs, key=lambda job: job.root_id):
initial_demand = self._get_job_int_demand(job, keys=initial_supply.values.keys())
remaining_demand, remaining_supply = initial_demand.subtract(initial_supply)
if remaining_demand:
Logger.warn('Ignoring job #{job_id} {job_name} by {job_user} because '
'it requires global resources which are not available at the moment: '
'{job_resources}...'
.format(job_id=job.id, job_name=job.name, job_user=job.user,
job_resources=self._as_resources_str(initial_demand, initial_supply)))
continue
initial_supply = remaining_supply
yield job
def _get_job_int_demand(self, job, keys):
return CustomResourceDemand(values=dict(self._get_job_int_requests(job, keys)))
def _get_job_int_requests(self, job, keys):
for request_name, request_value in job.requests.items():
if request_name not in keys:
continue
try:
yield request_name, self._parse_int(request_value)
except ValueError:
Logger.warn('Job #{job_id} by {job_user} has unsupported requirement: {name}={value}'
.format(job_id=job.root_id, job_user=job.user,
name=request_name, value=request_value),
trace=True)
def _get_int_supply(self, supply):
return CustomResourceSupply(values=dict(self._get_int_resources(supply)))
def _get_int_resources(self, supply):
for resource_name, resource_value in supply.values.items():
try:
yield resource_name, self._parse_int(resource_value)
except ValueError:
Logger.warn('Global has unsupported resource: {name}={value}'
.format(name=resource_name, value=resource_value),
trace=True)
def _parse_int(self, value):
return int(float(value))
def _as_resources_str(self, custom_demand, custom_supply):
return ', '.join('{demand}/{supply} {name}'
.format(name=key,
demand=custom_demand.values.get(key, 0),
supply=custom_supply.values.get(key, 0))
for key in custom_demand.values.keys())
class SunGridEngineHostWorkerValidatorHandler(WorkerValidatorHandler):
def __init__(self, cmd_executor):
self._cmd_executor = cmd_executor
self._cmd = 'qconf -se %s'
def is_valid(self, host):
try:
self._cmd_executor.execute(self._cmd % host)
return True
except RuntimeError as e:
if 'not an execution host' in str(e):
Logger.warn('Execution host {host} not found in GE which makes host unavailable'
.format(host=host),
crucial=True, trace=True)
return False
if 'can\'t resolve hostname' in str(e):
Logger.warn('Execution host {host} not found in GE (DNS) which makes host unavailable'
.format(host=host),
crucial=True, trace=True)
return False
Logger.warn('Execution host {host} not found in GE but it is considered available'
.format(host=host),
crucial=True, trace=True)
return True
class SunGridEngineStateWorkerValidatorHandler(WorkerValidatorHandler):
def __init__(self, cmd_executor, queue):
self._cmd_executor = cmd_executor
self._queue = queue
self._cmd = 'qhost -q -xml'
self._host_bad_states = ['u', 'E', 'd']
def is_valid(self, host):
try:
output = self._cmd_executor.execute(self._cmd)
root = ElementTree.fromstring(output)
for host_object in root.findall('host[@name=\'%s\']' % host):
for queue in host_object.findall('queue[@name=\'%s\']' % self._queue):
host_states = queue.find('queuevalue[@name=\'state_string\']').text or ''
for host_state in host_states:
if host_state in self._host_bad_states:
Logger.warn('Execution host {host} GE state is {host_state} which makes host unavailable'
.format(host=host, host_state=host_state),
crucial=True)
return False
if host_states:
Logger.warn('Execution host {host} GE state is {host_state} but it is considered available'
.format(host=host, host_state=', '.join(host_states)),
crucial=True)
return True
except RuntimeError:
Logger.warn('Execution host {host} GE state not found which makes host unavailable'
.format(host=host),
crucial=True, trace=True)
return False
class SunGridEngineJobValidator(GridEngineJobValidator):
def __init__(self, grid_engine, instance_max_supply, cluster_max_supply):
self.grid_engine = grid_engine
self.instance_max_supply = instance_max_supply
self.cluster_max_supply = cluster_max_supply
def validate(self, jobs):
valid_jobs, invalid_jobs = [], []
allocation_rules = {}
for job in jobs:
allocation_rule = allocation_rules[job.pe] = allocation_rules.get(job.pe) \
or self.grid_engine.get_pe_allocation_rule(job.pe)
job_demand = IntegralDemand(cpu=job.cpu, gpu=job.gpu, mem=job.mem, exc=job.exc)
if allocation_rule in AllocationRule.fractional_rules():
if job_demand > self.cluster_max_supply:
Logger.warn('Invalid job #{job_id} {job_name} by {job_user} requires resources '
'which cannot be satisfied by the cluster: '
'{job_cpu}/{available_cpu} cpu, '
'{job_gpu}/{available_gpu} gpu, '
'{job_mem}/{available_mem} mem, '
'{job_exc}/{available_exc} exc.'
.format(job_id=job.id, job_name=job.name, job_user=job.user,
job_cpu=job.cpu, available_cpu=self.cluster_max_supply.cpu,
job_gpu=job.gpu, available_gpu=self.cluster_max_supply.gpu,
job_mem=job.mem, available_mem=self.cluster_max_supply.mem,
job_exc=job.exc, available_exc=self.cluster_max_supply.exc),
crucial=True)
invalid_jobs.append(job)
continue
else:
if job_demand > self.instance_max_supply:
Logger.warn('Invalid job #{job_id} {job_name} by {job_user} requires resources '
'which cannot be satisfied by the biggest instance in cluster: '
'{job_cpu}/{available_cpu} cpu, '
'{job_gpu}/{available_gpu} gpu, '
'{job_mem}/{available_mem} mem, '
'{job_exc}/{available_exc} exc.'
.format(job_id=job.id, job_name=job.name, job_user=job.user,
job_cpu=job.cpu, available_cpu=self.instance_max_supply.cpu,
job_gpu=job.gpu, available_gpu=self.instance_max_supply.gpu,
job_mem=job.mem, available_mem=self.instance_max_supply.mem,
job_exc=job.exc, available_exc=self.instance_max_supply.exc),
crucial=True)
invalid_jobs.append(job)
continue
valid_jobs.append(job)
return valid_jobs, invalid_jobs
class SunGridEngineCustomRequestsPurgeJobProcessor(GridEngineJobProcessor):
def __init__(self, cmd_executor, gpu_resource_name, mem_resource_name, exc_resource_name, dry_run):
self._cmd_executor = cmd_executor
self._gpu_resource_name = gpu_resource_name
self._mem_resource_name = mem_resource_name
self._exc_resource_name = exc_resource_name
self._default_resource_names = [self._gpu_resource_name,
self._mem_resource_name,
self._exc_resource_name]
self._dry_run = dry_run
self._cmd = 'qalter {job_id} -l "{job_requests}"'
def process(self, jobs):
relevant_jobs, irrelevant_jobs = [], []
for job in jobs:
if job.root_id != job.id:
relevant_jobs.append(job)
continue
if all(request_name in self._default_resource_names for request_name in job.requests):
relevant_jobs.append(job)
continue
try:
Logger.info('Purging job #{} custom requirements...'.format(job.id))
self._purge_custom_requests(job)
irrelevant_jobs.append(job)
except Exception:
Logger.warn('Job #{} custom requirements purge has failed'.format(job.id), crucial=True, trace=True)
relevant_jobs.append(job)
return relevant_jobs, irrelevant_jobs
def _purge_custom_requests(self, job):
if self._dry_run:
return
job_default_requests = {}
if job.gpu:
job_default_requests[self._gpu_resource_name] = str(job.gpu)
if job.mem:
job_default_requests[self._mem_resource_name] = str(job.mem) + 'G'
if job.exc:
job_default_requests[self._exc_resource_name] = str(bool(job.exc)).lower()
self._cmd_executor.execute(self._cmd.format(
job_id=job.root_id,
job_requests=','.join('{}={}'.format(k, v) for k, v in job_default_requests.items())))
class SunGridEngineLaunchAdapter(GridEngineLaunchAdapter):
def __init__(self, queue, hostlist):
self._queue = queue
self._hostlist = hostlist
def get_worker_init_task_name(self):
return 'SGEWorkerSetup'
def get_worker_launch_params(self):
return {
'CP_CAP_SGE': 'false',
'CP_CAP_SGE_QUEUE_NAME': self._queue,
'CP_CAP_SGE_HOSTLIST_NAME': self._hostlist
}