in luigi/scheduler.py [0:0]
def set_status(self, task, new_status, config=None):
if new_status == FAILED:
assert config is not None
if new_status == DISABLED and task.status in (RUNNING, BATCH_RUNNING):
return
remove_on_failure = task.batch_id is not None and not task.batchable
if task.status == DISABLED:
if new_status == DONE:
self.re_enable(task)
# don't allow workers to override a scheduler disable
elif task.scheduler_disable_time is not None and new_status != DISABLED:
return
if task.status == RUNNING and task.batch_id is not None and new_status != RUNNING:
for batch_task in self.get_batch_running_tasks(task.batch_id):
self.set_status(batch_task, new_status, config)
batch_task.batch_id = None
task.batch_id = None
if new_status == FAILED and task.status != DISABLED:
task.add_failure()
if task.has_excessive_failures():
task.scheduler_disable_time = time.time()
new_status = DISABLED
if not config.batch_emails:
notifications.send_error_email(
'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
'{task} failed {failures} times in the last {window} seconds, so it is being '
'disabled for {persist} seconds'.format(
failures=task.retry_policy.retry_count,
task=task.id,
window=task.retry_policy.disable_window,
persist=config.disable_persist,
))
elif new_status == DISABLED:
task.scheduler_disable_time = None
if new_status != task.status:
self._status_tasks[task.status].pop(task.id)
self._status_tasks[new_status][task.id] = task
task.status = new_status
task.updated = time.time()
self.update_metrics(task, config)
if new_status == FAILED:
task.retry = time.time() + config.retry_delay
if remove_on_failure:
task.remove = time.time()