in luigi/scheduler.py [0:0]
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False,
assistant=False, tracking_url=None, worker=None, batchable=None,
batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
"""
* add task identified by task_id if it doesn't exist
* if deps is not None, update dependency list
* update status of task
* add additional workers/stakeholders
* update priority when needed
"""
assert worker is not None
worker_id = worker
worker = self._update_worker(worker_id)
resources = {} if resources is None else resources.copy()
if retry_policy_dict is None:
retry_policy_dict = {}
retry_policy = self._generate_retry_policy(retry_policy_dict)
if worker.enabled:
_default_task = self._make_task(
task_id=task_id, status=PENDING, deps=deps, resources=resources,
priority=priority, family=family, module=module, params=params, param_visibilities=param_visibilities,
)
else:
_default_task = None
task = self._state.get_task(task_id, setdefault=_default_task)
if task is None or (task.status != RUNNING and not worker.enabled):
return
# Ignore claims that the task is PENDING if it very recently was marked as DONE.
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs:
return
# for setting priority, we'll sometimes create tasks with unset family and params
if not task.family:
task.family = family
if not getattr(task, 'module', None):
task.module = module
if not getattr(task, 'param_visibilities', None):
task.param_visibilities = _get_default(param_visibilities, {})
if not task.params:
task.set_params(params)
if batch_id is not None:
task.batch_id = batch_id
if status == RUNNING and not task.worker_running:
task.worker_running = worker_id
if batch_id:
# copy resources_running of the first batch task
batch_tasks = self._state.get_batch_running_tasks(batch_id)
task.resources_running = batch_tasks[0].resources_running.copy()
task.time_running = time.time()
if accepts_messages is not None:
task.accepts_messages = accepts_messages
if tracking_url is not None or task.status != RUNNING:
task.tracking_url = tracking_url
if task.batch_id is not None:
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.tracking_url = tracking_url
if batchable is not None:
task.batchable = batchable
if task.remove is not None:
task.remove = None # unmark task for removal so it isn't removed after being added
if expl is not None:
task.expl = expl
if task.batch_id is not None:
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.expl = expl
task_is_not_running = task.status not in (RUNNING, BATCH_RUNNING)
task_started_a_run = status in (DONE, FAILED, RUNNING)
running_on_this_worker = task.worker_running == worker_id
if task_is_not_running or (task_started_a_run and running_on_this_worker) or new_deps:
# don't allow re-scheduling of task while it is running, it must either fail or succeed on the worker actually running it
if status != task.status or status == PENDING:
# Update the DB only if there was a acctual change, to prevent noise.
# We also check for status == PENDING b/c that's the default value
# (so checking for status != task.status woule lie)
self._update_task_history(task, status)
self._state.set_status(task, PENDING if status == SUSPENDED else status, self._config)
if status == FAILED and self._config.batch_emails:
batched_params, _ = self._state.get_batcher(worker_id, family)
if batched_params:
unbatched_params = {
param: value
for param, value in task.params.items()
if param not in batched_params
}
else:
unbatched_params = task.params
try:
expl_raw = json.loads(expl)
except ValueError:
expl_raw = expl
self._email_batcher.add_failure(
task.pretty_id, task.family, unbatched_params, expl_raw, owners)
if task.status == DISABLED:
self._email_batcher.add_disable(
task.pretty_id, task.family, unbatched_params, owners)
if deps is not None:
task.deps = set(deps)
if new_deps is not None:
task.deps.update(new_deps)
if resources is not None:
task.resources = resources
if worker.enabled and not assistant:
task.stakeholders.add(worker_id)
# Task dependencies might not exist yet. Let's create dummy tasks for them for now.
# Otherwise the task dependencies might end up being pruned if scheduling takes a long time
for dep in task.deps or []:
t = self._state.get_task(dep, setdefault=self._make_task(task_id=dep, status=UNKNOWN, deps=None, priority=priority))
t.stakeholders.add(worker_id)
self._update_priority(task, priority, worker_id)
# Because some tasks (non-dynamic dependencies) are `_make_task`ed
# before we know their retry_policy, we always set it here
task.retry_policy = retry_policy
if runnable and status != FAILED and worker.enabled:
task.workers.add(worker_id)
self._state.get_worker(worker_id).tasks.add(task)
task.runnable = runnable