in luigi/worker.py [0:0]
def _add(self, task, is_complete):
if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit:
logger.warning('Will not run %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit)
deps = None
status = UNKNOWN
runnable = False
else:
formatted_traceback = None
try:
self._check_complete_value(is_complete)
except KeyboardInterrupt:
raise
except AsyncCompletionException as ex:
formatted_traceback = ex.trace
except BaseException:
formatted_traceback = traceback.format_exc()
if formatted_traceback is not None:
self.add_succeeded = False
self._log_complete_error(task, formatted_traceback)
task.trigger_event(Event.DEPENDENCY_MISSING, task)
self._email_complete_error(task, formatted_traceback)
deps = None
status = UNKNOWN
runnable = False
elif is_complete:
deps = None
status = DONE
runnable = False
task.trigger_event(Event.DEPENDENCY_PRESENT, task)
elif _is_external(task):
deps = None
status = PENDING
runnable = self._config.retry_external_tasks
task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning('Data for %s does not exist (yet?). The task is an '
'external data dependency, so it cannot be run from'
' this luigi process.', task)
else:
try:
deps = task.deps()
self._add_task_batcher(task)
except Exception as ex:
formatted_traceback = traceback.format_exc()
self.add_succeeded = False
self._log_dependency_error(task, formatted_traceback)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_dependency_error(task, formatted_traceback)
deps = None
status = UNKNOWN
runnable = False
else:
status = PENDING
runnable = True
if task.disabled:
status = DISABLED
if deps:
for d in deps:
self._validate_dependency(d)
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add
deps = [d.task_id for d in deps]
self._scheduled_tasks[task.task_id] = task
self._add_task(
worker=self._id,
task_id=task.task_id,
status=status,
deps=deps,
runnable=runnable,
priority=task.priority,
resources=task.process_resources(),
params=task.to_str_params(),
family=task.task_family,
module=task.task_module,
batchable=task.batchable,
retry_policy_dict=_get_retry_policy_dict(task),
accepts_messages=task.accepts_messages,
)