in luigi/scheduler.py [0:0]
def task_list(self, status='', upstream_status='', limit=True, search=None, max_shown_tasks=None,
**kwargs):
"""
Query for a subset of tasks by status.
"""
if not search:
count_limit = max_shown_tasks or self._config.max_shown_tasks
pre_count = self._state.get_active_task_count_for_status(status)
if limit and pre_count > count_limit:
return {'num_tasks': -1 if upstream_status else pre_count}
self.prune()
result = {}
upstream_status_table = {} # used to memoize upstream status
if search is None:
def filter_func(_):
return True
else:
terms = search.split()
def filter_func(t):
return all(term.casefold() in t.pretty_id.casefold() for term in terms)
tasks = self._state.get_active_tasks_by_status(status) if status else self._state.get_active_tasks()
for task in filter(filter_func, tasks):
if task.status != PENDING or not upstream_status or upstream_status == self._upstream_status(task.id, upstream_status_table):
serialized = self._serialize_task(task.id, include_deps=False)
result[task.id] = serialized
if limit and len(result) > (max_shown_tasks or self._config.max_shown_tasks):
return {'num_tasks': len(result)}
return result