def _get_work()

in luigi/worker.py [0:0]


    def _get_work(self):
        if self._stop_requesting_work:
            return GetWorkResponse(None, 0, 0, 0, 0, WORKER_STATE_DISABLED)

        if self.worker_processes > 0:
            logger.debug("Asking scheduler for work...")
            r = self._scheduler.get_work(
                worker=self._id,
                host=self.host,
                assistant=self._assistant,
                current_tasks=list(self._running_tasks.keys()),
            )
        else:
            logger.debug("Checking if tasks are still pending")
            r = self._scheduler.count_pending(worker=self._id)

        running_tasks = r['running_tasks']
        task_id = self._get_work_task_id(r)

        self._get_work_response_history.append({
            'task_id': task_id,
            'running_tasks': running_tasks,
        })

        if task_id is not None and task_id not in self._scheduled_tasks:
            logger.info('Did not schedule %s, will load it dynamically', task_id)

            try:
                # TODO: we should obtain the module name from the server!
                self._scheduled_tasks[task_id] = \
                    load_task(module=r.get('task_module'),
                              task_name=r['task_family'],
                              params_str=r['task_params'])
            except TaskClassException as ex:
                self._handle_task_load_error(ex, [task_id])
                task_id = None
                self.run_succeeded = False

        if task_id is not None and 'batch_task_ids' in r:
            batch_tasks = filter(None, [
                self._scheduled_tasks.get(batch_id) for batch_id in r['batch_task_ids']])
            self._batch_running_tasks[task_id] = batch_tasks

        return GetWorkResponse(
            task_id=task_id,
            running_tasks=running_tasks,
            n_pending_tasks=r['n_pending_tasks'],
            n_unique_pending=r['n_unique_pending'],

            # TODO: For a tiny amount of time (a month?) we'll keep forwards compatibility
            #  That is you can user a newer client than server (Sep 2016)
            n_pending_last_scheduled=r.get('n_pending_last_scheduled', 0),
            worker_state=r.get('worker_state', WORKER_STATE_ACTIVE),
        )