in luigi/worker.py [0:0]
def _get_work(self):
if self._stop_requesting_work:
return GetWorkResponse(None, 0, 0, 0, 0, WORKER_STATE_DISABLED)
if self.worker_processes > 0:
logger.debug("Asking scheduler for work...")
r = self._scheduler.get_work(
worker=self._id,
host=self.host,
assistant=self._assistant,
current_tasks=list(self._running_tasks.keys()),
)
else:
logger.debug("Checking if tasks are still pending")
r = self._scheduler.count_pending(worker=self._id)
running_tasks = r['running_tasks']
task_id = self._get_work_task_id(r)
self._get_work_response_history.append({
'task_id': task_id,
'running_tasks': running_tasks,
})
if task_id is not None and task_id not in self._scheduled_tasks:
logger.info('Did not schedule %s, will load it dynamically', task_id)
try:
# TODO: we should obtain the module name from the server!
self._scheduled_tasks[task_id] = \
load_task(module=r.get('task_module'),
task_name=r['task_family'],
params_str=r['task_params'])
except TaskClassException as ex:
self._handle_task_load_error(ex, [task_id])
task_id = None
self.run_succeeded = False
if task_id is not None and 'batch_task_ids' in r:
batch_tasks = filter(None, [
self._scheduled_tasks.get(batch_id) for batch_id in r['batch_task_ids']])
self._batch_running_tasks[task_id] = batch_tasks
return GetWorkResponse(
task_id=task_id,
running_tasks=running_tasks,
n_pending_tasks=r['n_pending_tasks'],
n_unique_pending=r['n_unique_pending'],
# TODO: For a tiny amount of time (a month?) we'll keep forwards compatibility
# That is you can user a newer client than server (Sep 2016)
n_pending_last_scheduled=r.get('n_pending_last_scheduled', 0),
worker_state=r.get('worker_state', WORKER_STATE_ACTIVE),
)