def get_work()

in luigi/scheduler.py [0:0]


    def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, **kwargs):
        # TODO: remove any expired nodes

        # Algo: iterate over all nodes, find the highest priority node no dependencies and available
        # resources.

        # Resource checking looks both at currently available resources and at which resources would
        # be available if all running tasks died and we rescheduled all workers greedily. We do both
        # checks in order to prevent a worker with many low-priority tasks from starving other
        # workers with higher priority tasks that share the same resources.

        # TODO: remove tasks that can't be done, figure out if the worker has absolutely
        # nothing it can wait for

        if self._config.prune_on_get_work:
            self.prune()

        assert worker is not None
        worker_id = worker
        worker = self._update_worker(
            worker_id,
            worker_reference={'host': host},
            get_work=True)
        if not worker.enabled:
            reply = {'n_pending_tasks': 0,
                     'running_tasks': [],
                     'task_id': None,
                     'n_unique_pending': 0,
                     'worker_state': worker.state,
                     }
            return reply

        if assistant:
            self.add_worker(worker_id, [('assistant', assistant)])

        batched_params, unbatched_params, batched_tasks, max_batch_size = None, None, [], 1
        best_task = None
        if current_tasks is not None:
            ct_set = set(current_tasks)
            for task in sorted(self._state.get_active_tasks_by_status(RUNNING), key=self._rank):
                if task.worker_running == worker_id and task.id not in ct_set:
                    best_task = task

        if current_tasks is not None:
            # batch running tasks that weren't claimed since the last get_work go back in the pool
            self._reset_orphaned_batch_running_tasks(worker_id)

        greedy_resources = collections.defaultdict(int)

        worker = self._state.get_worker(worker_id)
        if self._paused:
            relevant_tasks = []
        elif worker.is_trivial_worker(self._state):
            relevant_tasks = worker.get_tasks(self._state, PENDING, RUNNING)
            used_resources = collections.defaultdict(int)
            greedy_workers = dict()  # If there's no resources, then they can grab any task
        else:
            relevant_tasks = self._state.get_active_tasks_by_status(PENDING, RUNNING)
            used_resources = self._used_resources()
            activity_limit = time.time() - self._config.worker_disconnect_delay
            active_workers = self._state.get_active_workers(last_get_work_gt=activity_limit)
            greedy_workers = dict((worker.id, worker.info.get('workers', 1))
                                  for worker in active_workers)
        tasks = list(relevant_tasks)
        tasks.sort(key=self._rank, reverse=True)

        for task in tasks:
            if (best_task and batched_params and task.family == best_task.family and
                    len(batched_tasks) < max_batch_size and task.is_batchable() and all(
                    task.params.get(name) == value for name, value in unbatched_params.items()) and
                    task.resources == best_task.resources and self._schedulable(task)):
                for name, params in batched_params.items():
                    params.append(task.params.get(name))
                batched_tasks.append(task)
            if best_task:
                continue

            if task.status == RUNNING and (task.worker_running in greedy_workers):
                greedy_workers[task.worker_running] -= 1
                for resource, amount in (getattr(task, 'resources_running', task.resources) or {}).items():
                    greedy_resources[resource] += amount

            if self._schedulable(task) and self._has_resources(task.resources, greedy_resources):
                in_workers = (assistant and task.runnable) or worker_id in task.workers
                if in_workers and self._has_resources(task.resources, used_resources):
                    best_task = task
                    batch_param_names, max_batch_size = self._state.get_batcher(
                        worker_id, task.family)
                    if batch_param_names and task.is_batchable():
                        try:
                            batched_params = {
                                name: [task.params[name]] for name in batch_param_names
                            }
                            unbatched_params = {
                                name: value for name, value in task.params.items()
                                if name not in batched_params
                            }
                            batched_tasks.append(task)
                        except KeyError:
                            batched_params, unbatched_params = None, None
                else:
                    workers = itertools.chain(task.workers, [worker_id]) if assistant else task.workers
                    for task_worker in workers:
                        if greedy_workers.get(task_worker, 0) > 0:
                            # use up a worker
                            greedy_workers[task_worker] -= 1

                            # keep track of the resources used in greedy scheduling
                            for resource, amount in (task.resources or {}).items():
                                greedy_resources[resource] += amount

                            break

        reply = self.count_pending(worker_id)

        if len(batched_tasks) > 1:
            batch_string = '|'.join(task.id for task in batched_tasks)
            batch_id = hashlib.new('md5', batch_string.encode('utf-8'), usedforsecurity=False).hexdigest()
            for task in batched_tasks:
                self._state.set_batch_running(task, batch_id, worker_id)

            combined_params = best_task.params.copy()
            combined_params.update(batched_params)

            reply['task_id'] = None
            reply['task_family'] = best_task.family
            reply['task_module'] = getattr(best_task, 'module', None)
            reply['task_params'] = combined_params
            reply['batch_id'] = batch_id
            reply['batch_task_ids'] = [task.id for task in batched_tasks]

        elif best_task:
            self.update_metrics_task_started(best_task)
            self._state.set_status(best_task, RUNNING, self._config)
            best_task.worker_running = worker_id
            best_task.resources_running = best_task.resources.copy()
            best_task.time_running = time.time()
            self._update_task_history(best_task, RUNNING, host=host)

            reply['task_id'] = best_task.id
            reply['task_family'] = best_task.family
            reply['task_module'] = getattr(best_task, 'module', None)
            reply['task_params'] = best_task.params

        else:
            reply['task_id'] = None

        return reply