in luigi/worker.py [0:0]
def run(self):
logger.info('[pid %s] Worker %s running %s', os.getpid(), self.worker_id, self.task)
if self.use_multiprocessing:
# Need to have different random seeds if running in separate processes
processID = os.getpid()
currentTime = time.time()
random.seed(processID * currentTime)
status = FAILED
expl = ''
missing = []
new_deps = []
try:
# Verify that all the tasks are fulfilled! For external tasks we
# don't care about unfulfilled dependencies, because we are just
# checking completeness of self.task so outputs of dependencies are
# irrelevant.
if self.check_unfulfilled_deps and not _is_external(self.task):
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()]
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
else:
missing.append(dep.task_id)
if missing:
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
self.task.trigger_event(Event.START, self.task)
t0 = time.time()
status = None
if _is_external(self.task):
# External task
if self.check_complete(self.task):
status = DONE
else:
status = FAILED
expl = 'Task is an external data dependency ' \
'and data does not exist (yet?).'
else:
with self._forward_attributes():
new_deps = self._run_get_new_deps()
if not new_deps:
if not self.check_complete_on_run:
# update the cache
if self.task_completion_cache is not None:
self.task_completion_cache[self.task.task_id] = True
status = DONE
elif self.check_complete(self.task):
status = DONE
else:
raise TaskException("Task finished running, but complete() is still returning false.")
else:
status = PENDING
if new_deps:
logger.info(
'[pid %s] Worker %s new requirements %s',
os.getpid(), self.worker_id, self.task)
elif status == DONE:
self.task.trigger_event(
Event.PROCESSING_TIME, self.task, time.time() - t0)
expl = self.task.on_success()
logger.info('[pid %s] Worker %s done %s', os.getpid(),
self.worker_id, self.task)
self.task.trigger_event(Event.SUCCESS, self.task)
except KeyboardInterrupt:
raise
except BaseException as ex:
status = FAILED
expl = self._handle_run_exception(ex)
finally:
self.result_queue.put(
(self.task.task_id, status, expl, missing, new_deps))