in luigi/worker.py [0:0]
def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs):
if scheduler is None:
scheduler = Scheduler()
self.worker_processes = int(worker_processes)
self._worker_info = self._generate_worker_info()
self._config = worker(**kwargs)
worker_id = worker_id or self._config.id or self._generate_worker_id(self._worker_info)
assert self._config.wait_interval >= _WAIT_INTERVAL_EPS, "[worker] wait_interval must be positive"
assert self._config.wait_jitter >= 0.0, "[worker] wait_jitter must be equal or greater than zero"
self._id = worker_id
self._scheduler = scheduler
self._assistant = assistant
self._stop_requesting_work = False
self.host = socket.gethostname()
self._scheduled_tasks = {}
self._suspended_tasks = {}
self._batch_running_tasks = {}
self._batch_families_sent = set()
self._first_task = None
self.add_succeeded = True
self.run_succeeded = True
self.unfulfilled_counts = collections.defaultdict(int)
# note that ``signal.signal(signal.SIGUSR1, fn)`` only works inside the main execution thread, which is why we
# provide the ability to conditionally install the hook.
if not self._config.no_install_shutdown_handler:
try:
signal.signal(signal.SIGUSR1, self.handle_interrupt)
signal.siginterrupt(signal.SIGUSR1, False)
except AttributeError:
pass
# Keep info about what tasks are running (could be in other processes)
self._task_result_queue = multiprocessing.Queue()
self._running_tasks = {}
self._idle_since = None
# mp-safe dictionary for caching completation checks across task processes
self._task_completion_cache = None
if self._config.cache_task_completion:
self._task_completion_cache = multiprocessing.Manager().dict()
# Stuff for execution_summary
self._add_task_history = []
self._get_work_response_history = []