in luigi/scheduler.py [0:0]
def _traverse_graph(self, root_task_id, seen=None, dep_func=None, include_done=True):
""" Returns the dependency graph rooted at task_id
This does a breadth-first traversal to find the nodes closest to the
root before hitting the scheduler.max_graph_nodes limit.
:param root_task_id: the id of the graph's root
:return: A map of task id to serialized node
"""
if seen is None:
seen = set()
elif root_task_id in seen:
return {}
if dep_func is None:
def dep_func(t):
return t.deps
seen.add(root_task_id)
serialized = {}
queue = collections.deque([root_task_id])
while queue:
task_id = queue.popleft()
task = self._state.get_task(task_id)
if task is None or not task.family:
logger.debug('Missing task for id [%s]', task_id)
# NOTE : If a dependency is missing from self._state there is no way to deduce the
# task family and parameters.
family_match = TASK_FAMILY_RE.match(task_id)
family = family_match.group(1) if family_match else UNKNOWN
params = {'task_id': task_id}
serialized[task_id] = {
'deps': [],
'status': UNKNOWN,
'workers': [],
'start_time': UNKNOWN,
'params': params,
'name': family,
'display_name': task_id,
'priority': 0,
}
else:
deps = dep_func(task)
if not include_done:
deps = list(self._filter_done(deps))
serialized[task_id] = self._serialize_task(task_id, deps=deps)
for dep in sorted(deps):
if dep not in seen:
seen.add(dep)
queue.append(dep)
if task_id != root_task_id:
del serialized[task_id]['display_name']
if len(serialized) >= self._config.max_graph_nodes:
break
return serialized