def _traverse_graph()

in luigi/scheduler.py [0:0]


    def _traverse_graph(self, root_task_id, seen=None, dep_func=None, include_done=True):
        """ Returns the dependency graph rooted at task_id

        This does a breadth-first traversal to find the nodes closest to the
        root before hitting the scheduler.max_graph_nodes limit.

        :param root_task_id: the id of the graph's root
        :return: A map of task id to serialized node
        """

        if seen is None:
            seen = set()
        elif root_task_id in seen:
            return {}

        if dep_func is None:
            def dep_func(t):
                return t.deps

        seen.add(root_task_id)
        serialized = {}
        queue = collections.deque([root_task_id])
        while queue:
            task_id = queue.popleft()

            task = self._state.get_task(task_id)
            if task is None or not task.family:
                logger.debug('Missing task for id [%s]', task_id)

                # NOTE : If a dependency is missing from self._state there is no way to deduce the
                #        task family and parameters.
                family_match = TASK_FAMILY_RE.match(task_id)
                family = family_match.group(1) if family_match else UNKNOWN
                params = {'task_id': task_id}
                serialized[task_id] = {
                    'deps': [],
                    'status': UNKNOWN,
                    'workers': [],
                    'start_time': UNKNOWN,
                    'params': params,
                    'name': family,
                    'display_name': task_id,
                    'priority': 0,
                }
            else:
                deps = dep_func(task)
                if not include_done:
                    deps = list(self._filter_done(deps))
                serialized[task_id] = self._serialize_task(task_id, deps=deps)
                for dep in sorted(deps):
                    if dep not in seen:
                        seen.add(dep)
                        queue.append(dep)

            if task_id != root_task_id:
                del serialized[task_id]['display_name']
            if len(serialized) >= self._config.max_graph_nodes:
                break

        return serialized