in graph/builder/heron/builder.py [0:0]
def _create_physical_connections(graph_client: GremlinClient, topology_id: str,
topology_ref: str) -> None:
LOG.info("Creating physical connections for topology: %s, reference: "
"%s", topology_id, topology_ref)
topo_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
# First get all logically connected pairs of vertex and their associated
# containers and stream managers
logical_edges: List[Dict[str, Union[Vertex, Edge]]] = (
topo_traversal.V().hasLabel(P.within("bolt", "spout"))
.outE("logically_connected")
.project("source_instance", "source_container",
"source_stream_manager", "l_edge", "destination_instance",
"destination_container", "destination_stream_manager")
.by(outV())
.by(outV().out("is_within"))
.by(outV().out("is_within").in_("is_within")
.hasLabel("stream_manager"))
.by()
.by(inV())
.by(inV().out("is_within"))
.by(inV().out("is_within").in_("is_within")
.hasLabel("stream_manager"))
.toList())
LOG.debug("Processing %d logical connected vertices", len(logical_edges))
for logical_edge in logical_edges:
source: Vertex = logical_edge["source_instance"]
source_container: Vertex = logical_edge["source_container"]
source_stream_manager: Vertex = logical_edge["source_stream_manager"]
destination: Vertex = logical_edge["destination_instance"]
destination_container: Vertex = logical_edge["destination_container"]
destination_stream_manager: Vertex = \
logical_edge["destination_stream_manager"]
l_edge: Edge = logical_edge["l_edge"]
# Connect the source instance to its stream manager, checking first
# if the connection already exists
(graph_client.graph_traversal.V(source)
.coalesce(out("physically_connected").is_(source_stream_manager),
addE("physically_connected").to(source_stream_manager))
.next())
if source_container == destination_container:
# If the source and destination instances are in the same
# container then they share the same stream manager so just use
# the source stream manager found above. Connect the source
# stream manager to the destination instance
(graph_client.graph_traversal.V(source_stream_manager)
.coalesce(out("physically_connected").is_(destination),
addE("physically_connected").to(destination))
.next())
# Set the logical edge for this pair to "local"
graph_client.graph_traversal.E(l_edge).property("type",
"local").next()
else:
# Connect the two stream managers (if they aren't already)
(graph_client.graph_traversal.V(source_stream_manager)
.coalesce(
out("physically_connected").is_(destination_stream_manager),
addE("physically_connected").to(destination_stream_manager))
.next())
(graph_client.graph_traversal.V(destination_stream_manager)
.coalesce(out("physically_connected").is_(destination),
addE("physically_connected").to(destination))
.next())
# Set the logical edge for this pair to "remote"
graph_client.graph_traversal.E(l_edge).property("type",
"remote").next()