in graph/utils/heron.py [0:0]
def get_all_paths(graph_client: GremlinClient, topology_id: str) -> List[List[str]]:
""" This function first gets all spouts from gremlin. Then it creates a dictionary,
mapping all tasks to downstream tasks. It passes this dictionary along to another function
that calculates all paths from the provided spouts to sinks.
Arguments:
graph_client (GremlinClient): The client instance for the graph
database.
topology_id (str): The topology ID string.
Returns:
All possible paths from sources to sinks.
"""
LOG.info("Graph size: %d vertices %d edges",
graph_client.graph_traversal.V().count().toList()[0],
graph_client.graph_traversal.E().count().toList()[0])
conn_type = "logically_connected"
start: dt.datetime = dt.datetime.now()
spouts = graph_client.graph_traversal.V().has("topology_id", topology_id).\
hasLabel("spout").where(outE(conn_type)).dedup().toList()
spout_tasks = []
parent_to_child = dict()
for spout in spouts:
downstream_task_vertices = [spout]
spout_tasks.append(graph_client.graph_traversal.V(spout).properties('task_id').value().next())
while len(downstream_task_vertices) != 0:
for vertex in downstream_task_vertices:
vertex_task_id = graph_client.graph_traversal.V(vertex).properties('task_id').value().next()
downstream_task_vertices = graph_client.graph_traversal.V(vertex).out(conn_type).dedup().toList()
downstream_task_ids = graph_client.graph_traversal.V(vertex).out(conn_type).properties(
'task_id').value().dedup().toList()
if len(downstream_task_vertices) != 0:
parent_to_child[vertex_task_id] = downstream_task_ids
paths: List = path_helper(parent_to_child, spout_tasks)
LOG.info("Number of paths returned: %d", len(paths))
end: dt.datetime = dt.datetime.now()
LOG.info("Time spent in fetching all paths: %d seconds", (end - start).total_seconds())
return paths