def set_status()

in luigi/scheduler.py [0:0]


    def set_status(self, task, new_status, config=None):
        if new_status == FAILED:
            assert config is not None

        if new_status == DISABLED and task.status in (RUNNING, BATCH_RUNNING):
            return

        remove_on_failure = task.batch_id is not None and not task.batchable

        if task.status == DISABLED:
            if new_status == DONE:
                self.re_enable(task)

            # don't allow workers to override a scheduler disable
            elif task.scheduler_disable_time is not None and new_status != DISABLED:
                return

        if task.status == RUNNING and task.batch_id is not None and new_status != RUNNING:
            for batch_task in self.get_batch_running_tasks(task.batch_id):
                self.set_status(batch_task, new_status, config)
                batch_task.batch_id = None
            task.batch_id = None

        if new_status == FAILED and task.status != DISABLED:
            task.add_failure()
            if task.has_excessive_failures():
                task.scheduler_disable_time = time.time()
                new_status = DISABLED
                if not config.batch_emails:
                    notifications.send_error_email(
                        'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
                        '{task} failed {failures} times in the last {window} seconds, so it is being '
                        'disabled for {persist} seconds'.format(
                            failures=task.retry_policy.retry_count,
                            task=task.id,
                            window=task.retry_policy.disable_window,
                            persist=config.disable_persist,
                        ))
        elif new_status == DISABLED:
            task.scheduler_disable_time = None

        if new_status != task.status:
            self._status_tasks[task.status].pop(task.id)
            self._status_tasks[new_status][task.id] = task
            task.status = new_status
            task.updated = time.time()
            self.update_metrics(task, config)

        if new_status == FAILED:
            task.retry = time.time() + config.retry_delay
            if remove_on_failure:
                task.remove = time.time()