in luigi/worker.py [0:0]
def add(self, task, multiprocess=False, processes=0):
"""
Add a Task for the worker to check and possibly schedule and run.
Returns True if task and its dependencies were successfully scheduled or completed before.
"""
if self._first_task is None and hasattr(task, 'task_id'):
self._first_task = task.task_id
self.add_succeeded = True
if multiprocess:
queue = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(processes=processes if processes > 0 else None)
else:
queue = DequeQueue()
pool = SingleProcessPool()
self._validate_task(task)
pool.apply_async(check_complete, [task, queue, self._task_completion_cache])
# we track queue size ourselves because len(queue) won't work for multiprocessing
queue_size = 1
try:
seen = {task.task_id}
while queue_size:
current = queue.get()
queue_size -= 1
item, is_complete = current
for next in self._add(item, is_complete):
if next.task_id not in seen:
self._validate_task(next)
seen.add(next.task_id)
pool.apply_async(check_complete, [next, queue, self._task_completion_cache])
queue_size += 1
except (KeyboardInterrupt, TaskException):
raise
except Exception as ex:
self.add_succeeded = False
formatted_traceback = traceback.format_exc()
self._log_unexpected_error(task)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_unexpected_error(task, formatted_traceback)
raise
finally:
pool.close()
pool.join()
return self.add_succeeded