in luigi/scheduler.py [0:0]
def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, **kwargs):
# TODO: remove any expired nodes
# Algo: iterate over all nodes, find the highest priority node no dependencies and available
# resources.
# Resource checking looks both at currently available resources and at which resources would
# be available if all running tasks died and we rescheduled all workers greedily. We do both
# checks in order to prevent a worker with many low-priority tasks from starving other
# workers with higher priority tasks that share the same resources.
# TODO: remove tasks that can't be done, figure out if the worker has absolutely
# nothing it can wait for
if self._config.prune_on_get_work:
self.prune()
assert worker is not None
worker_id = worker
worker = self._update_worker(
worker_id,
worker_reference={'host': host},
get_work=True)
if not worker.enabled:
reply = {'n_pending_tasks': 0,
'running_tasks': [],
'task_id': None,
'n_unique_pending': 0,
'worker_state': worker.state,
}
return reply
if assistant:
self.add_worker(worker_id, [('assistant', assistant)])
batched_params, unbatched_params, batched_tasks, max_batch_size = None, None, [], 1
best_task = None
if current_tasks is not None:
ct_set = set(current_tasks)
for task in sorted(self._state.get_active_tasks_by_status(RUNNING), key=self._rank):
if task.worker_running == worker_id and task.id not in ct_set:
best_task = task
if current_tasks is not None:
# batch running tasks that weren't claimed since the last get_work go back in the pool
self._reset_orphaned_batch_running_tasks(worker_id)
greedy_resources = collections.defaultdict(int)
worker = self._state.get_worker(worker_id)
if self._paused:
relevant_tasks = []
elif worker.is_trivial_worker(self._state):
relevant_tasks = worker.get_tasks(self._state, PENDING, RUNNING)
used_resources = collections.defaultdict(int)
greedy_workers = dict() # If there's no resources, then they can grab any task
else:
relevant_tasks = self._state.get_active_tasks_by_status(PENDING, RUNNING)
used_resources = self._used_resources()
activity_limit = time.time() - self._config.worker_disconnect_delay
active_workers = self._state.get_active_workers(last_get_work_gt=activity_limit)
greedy_workers = dict((worker.id, worker.info.get('workers', 1))
for worker in active_workers)
tasks = list(relevant_tasks)
tasks.sort(key=self._rank, reverse=True)
for task in tasks:
if (best_task and batched_params and task.family == best_task.family and
len(batched_tasks) < max_batch_size and task.is_batchable() and all(
task.params.get(name) == value for name, value in unbatched_params.items()) and
task.resources == best_task.resources and self._schedulable(task)):
for name, params in batched_params.items():
params.append(task.params.get(name))
batched_tasks.append(task)
if best_task:
continue
if task.status == RUNNING and (task.worker_running in greedy_workers):
greedy_workers[task.worker_running] -= 1
for resource, amount in (getattr(task, 'resources_running', task.resources) or {}).items():
greedy_resources[resource] += amount
if self._schedulable(task) and self._has_resources(task.resources, greedy_resources):
in_workers = (assistant and task.runnable) or worker_id in task.workers
if in_workers and self._has_resources(task.resources, used_resources):
best_task = task
batch_param_names, max_batch_size = self._state.get_batcher(
worker_id, task.family)
if batch_param_names and task.is_batchable():
try:
batched_params = {
name: [task.params[name]] for name in batch_param_names
}
unbatched_params = {
name: value for name, value in task.params.items()
if name not in batched_params
}
batched_tasks.append(task)
except KeyError:
batched_params, unbatched_params = None, None
else:
workers = itertools.chain(task.workers, [worker_id]) if assistant else task.workers
for task_worker in workers:
if greedy_workers.get(task_worker, 0) > 0:
# use up a worker
greedy_workers[task_worker] -= 1
# keep track of the resources used in greedy scheduling
for resource, amount in (task.resources or {}).items():
greedy_resources[resource] += amount
break
reply = self.count_pending(worker_id)
if len(batched_tasks) > 1:
batch_string = '|'.join(task.id for task in batched_tasks)
batch_id = hashlib.new('md5', batch_string.encode('utf-8'), usedforsecurity=False).hexdigest()
for task in batched_tasks:
self._state.set_batch_running(task, batch_id, worker_id)
combined_params = best_task.params.copy()
combined_params.update(batched_params)
reply['task_id'] = None
reply['task_family'] = best_task.family
reply['task_module'] = getattr(best_task, 'module', None)
reply['task_params'] = combined_params
reply['batch_id'] = batch_id
reply['batch_task_ids'] = [task.id for task in batched_tasks]
elif best_task:
self.update_metrics_task_started(best_task)
self._state.set_status(best_task, RUNNING, self._config)
best_task.worker_running = worker_id
best_task.resources_running = best_task.resources.copy()
best_task.time_running = time.time()
self._update_task_history(best_task, RUNNING, host=host)
reply['task_id'] = best_task.id
reply['task_family'] = best_task.family
reply['task_module'] = getattr(best_task, 'module', None)
reply['task_params'] = best_task.params
else:
reply['task_id'] = None
return reply