def run()

in luigi/worker.py [0:0]


    def run(self):
        logger.info('[pid %s] Worker %s running   %s', os.getpid(), self.worker_id, self.task)

        if self.use_multiprocessing:
            # Need to have different random seeds if running in separate processes
            processID = os.getpid()
            currentTime = time.time()
            random.seed(processID * currentTime)

        status = FAILED
        expl = ''
        missing = []
        new_deps = []
        try:
            # Verify that all the tasks are fulfilled! For external tasks we
            # don't care about unfulfilled dependencies, because we are just
            # checking completeness of self.task so outputs of dependencies are
            # irrelevant.
            if self.check_unfulfilled_deps and not _is_external(self.task):
                missing = []
                for dep in self.task.deps():
                    if not self.check_complete(dep):
                        nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()]
                        if nonexistent_outputs:
                            missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
                        else:
                            missing.append(dep.task_id)
                if missing:
                    deps = 'dependency' if len(missing) == 1 else 'dependencies'
                    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
            self.task.trigger_event(Event.START, self.task)
            t0 = time.time()
            status = None

            if _is_external(self.task):
                # External task
                if self.check_complete(self.task):
                    status = DONE
                else:
                    status = FAILED
                    expl = 'Task is an external data dependency ' \
                        'and data does not exist (yet?).'
            else:
                with self._forward_attributes():
                    new_deps = self._run_get_new_deps()
                if not new_deps:
                    if not self.check_complete_on_run:
                        # update the cache
                        if self.task_completion_cache is not None:
                            self.task_completion_cache[self.task.task_id] = True
                        status = DONE
                    elif self.check_complete(self.task):
                        status = DONE
                    else:
                        raise TaskException("Task finished running, but complete() is still returning false.")
                else:
                    status = PENDING

            if new_deps:
                logger.info(
                    '[pid %s] Worker %s new requirements      %s',
                    os.getpid(), self.worker_id, self.task)
            elif status == DONE:
                self.task.trigger_event(
                    Event.PROCESSING_TIME, self.task, time.time() - t0)
                expl = self.task.on_success()
                logger.info('[pid %s] Worker %s done      %s', os.getpid(),
                            self.worker_id, self.task)
                self.task.trigger_event(Event.SUCCESS, self.task)

        except KeyboardInterrupt:
            raise
        except BaseException as ex:
            status = FAILED
            expl = self._handle_run_exception(ex)

        finally:
            self.result_queue.put(
                (self.task.task_id, status, expl, missing, new_deps))