def add_task()

in luigi/scheduler.py [0:0]


    def add_task(self, task_id=None, status=PENDING, runnable=True,
                 deps=None, new_deps=None, expl=None, resources=None,
                 priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False,
                 assistant=False, tracking_url=None, worker=None, batchable=None,
                 batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
        """
        * add task identified by task_id if it doesn't exist
        * if deps is not None, update dependency list
        * update status of task
        * add additional workers/stakeholders
        * update priority when needed
        """
        assert worker is not None
        worker_id = worker
        worker = self._update_worker(worker_id)

        resources = {} if resources is None else resources.copy()

        if retry_policy_dict is None:
            retry_policy_dict = {}

        retry_policy = self._generate_retry_policy(retry_policy_dict)

        if worker.enabled:
            _default_task = self._make_task(
                task_id=task_id, status=PENDING, deps=deps, resources=resources,
                priority=priority, family=family, module=module, params=params, param_visibilities=param_visibilities,
            )
        else:
            _default_task = None

        task = self._state.get_task(task_id, setdefault=_default_task)

        if task is None or (task.status != RUNNING and not worker.enabled):
            return

        # Ignore claims that the task is PENDING if it very recently was marked as DONE.
        if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs:
            return

        # for setting priority, we'll sometimes create tasks with unset family and params
        if not task.family:
            task.family = family
        if not getattr(task, 'module', None):
            task.module = module
        if not getattr(task, 'param_visibilities', None):
            task.param_visibilities = _get_default(param_visibilities, {})
        if not task.params:
            task.set_params(params)

        if batch_id is not None:
            task.batch_id = batch_id
        if status == RUNNING and not task.worker_running:
            task.worker_running = worker_id
            if batch_id:
                # copy resources_running of the first batch task
                batch_tasks = self._state.get_batch_running_tasks(batch_id)
                task.resources_running = batch_tasks[0].resources_running.copy()
            task.time_running = time.time()

        if accepts_messages is not None:
            task.accepts_messages = accepts_messages

        if tracking_url is not None or task.status != RUNNING:
            task.tracking_url = tracking_url
            if task.batch_id is not None:
                for batch_task in self._state.get_batch_running_tasks(task.batch_id):
                    batch_task.tracking_url = tracking_url

        if batchable is not None:
            task.batchable = batchable

        if task.remove is not None:
            task.remove = None  # unmark task for removal so it isn't removed after being added

        if expl is not None:
            task.expl = expl
            if task.batch_id is not None:
                for batch_task in self._state.get_batch_running_tasks(task.batch_id):
                    batch_task.expl = expl

        task_is_not_running = task.status not in (RUNNING, BATCH_RUNNING)
        task_started_a_run = status in (DONE, FAILED, RUNNING)
        running_on_this_worker = task.worker_running == worker_id
        if task_is_not_running or (task_started_a_run and running_on_this_worker) or new_deps:
            # don't allow re-scheduling of task while it is running, it must either fail or succeed on the worker actually running it
            if status != task.status or status == PENDING:
                # Update the DB only if there was a acctual change, to prevent noise.
                # We also check for status == PENDING b/c that's the default value
                # (so checking for status != task.status woule lie)
                self._update_task_history(task, status)
            self._state.set_status(task, PENDING if status == SUSPENDED else status, self._config)

        if status == FAILED and self._config.batch_emails:
            batched_params, _ = self._state.get_batcher(worker_id, family)
            if batched_params:
                unbatched_params = {
                    param: value
                    for param, value in task.params.items()
                    if param not in batched_params
                }
            else:
                unbatched_params = task.params
            try:
                expl_raw = json.loads(expl)
            except ValueError:
                expl_raw = expl

            self._email_batcher.add_failure(
                task.pretty_id, task.family, unbatched_params, expl_raw, owners)
            if task.status == DISABLED:
                self._email_batcher.add_disable(
                    task.pretty_id, task.family, unbatched_params, owners)

        if deps is not None:
            task.deps = set(deps)

        if new_deps is not None:
            task.deps.update(new_deps)

        if resources is not None:
            task.resources = resources

        if worker.enabled and not assistant:
            task.stakeholders.add(worker_id)

            # Task dependencies might not exist yet. Let's create dummy tasks for them for now.
            # Otherwise the task dependencies might end up being pruned if scheduling takes a long time
            for dep in task.deps or []:
                t = self._state.get_task(dep, setdefault=self._make_task(task_id=dep, status=UNKNOWN, deps=None, priority=priority))
                t.stakeholders.add(worker_id)

        self._update_priority(task, priority, worker_id)

        # Because some tasks (non-dynamic dependencies) are `_make_task`ed
        # before we know their retry_policy, we always set it here
        task.retry_policy = retry_policy

        if runnable and status != FAILED and worker.enabled:
            task.workers.add(worker_id)
            self._state.get_worker(worker_id).tasks.add(task)
            task.runnable = runnable