in luigi/worker.py [0:0]
def _handle_next_task(self):
"""
We have to catch three ways a task can be "done":
1. normal execution: the task runs/fails and puts a result back on the queue,
2. new dependencies: the task yielded new deps that were not complete and
will be rescheduled and dependencies added,
3. child process dies: we need to catch this separately.
"""
self._idle_since = None
while True:
self._purge_children() # Deal with subprocess failures
try:
task_id, status, expl, missing, new_requirements = (
self._task_result_queue.get(
timeout=self._config.wait_interval))
except Queue.Empty:
return
task = self._scheduled_tasks[task_id]
if not task or task_id not in self._running_tasks:
continue
# Not a running task. Probably already removed.
# Maybe it yielded something?
# external task if run not implemented, retry-able if config option is enabled.
external_task_retryable = _is_external(task) and self._config.retry_external_tasks
if status == FAILED and not external_task_retryable:
self._email_task_failure(task, expl)
new_deps = []
if new_requirements:
new_req = [load_task(module, name, params)
for module, name, params in new_requirements]
for t in new_req:
self.add(t)
new_deps = [t.task_id for t in new_req]
self._add_task(worker=self._id,
task_id=task_id,
status=status,
expl=json.dumps(expl),
resources=task.process_resources(),
runnable=None,
params=task.to_str_params(),
family=task.task_family,
module=task.task_module,
new_deps=new_deps,
assistant=self._assistant,
retry_policy_dict=_get_retry_policy_dict(task))
self._running_tasks.pop(task_id)
# re-add task to reschedule missing dependencies
if missing:
reschedule = True
# keep out of infinite loops by not rescheduling too many times
for task_id in missing:
self.unfulfilled_counts[task_id] += 1
if (self.unfulfilled_counts[task_id] >
self._config.max_reschedules):
reschedule = False
if reschedule:
self.add(task)
self.run_succeeded &= (status == DONE) or (len(new_deps) > 0)
return