in graph/builder/heron/builder.py [0:0]
def _create_logical_connections(graph_client: GremlinClient, topology_id: str,
topology_ref: str,
logical_plan: Dict[str, Any]
) -> None:
# Add all the logical connections between the topology's instances
LOG.info("Adding logical connections to topology %s instances",
topology_id)
topo_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
counter: int = 0
for bolt_name, bolt_data in logical_plan["bolts"].items():
LOG.debug("Adding logical connections for instances of "
"destination bolt: %s", bolt_name)
# Get a list of all instance vertices for this bolt
destination_instances: List[Vertex] = (
topo_traversal.V()
.has("component", bolt_name)
.toList())
for incoming_stream in bolt_data["inputs"]:
source_instances: List[Vertex] = (
topo_traversal.V()
.has("component", incoming_stream["component_name"])
.toList())
for destination in destination_instances:
for source in source_instances:
(topo_traversal.V(source)
.addE("logically_connected")
.property("stream",
incoming_stream["stream_name"])
.property("grouping", incoming_stream["grouping"])
.to(destination).next())
counter += 1
LOG.info("Created %d logical connections", counter)