def _create_physical_connections()

in graph/builder/heron/builder.py [0:0]


def _create_physical_connections(graph_client: GremlinClient, topology_id: str,
                                 topology_ref: str) -> None:

    LOG.info("Creating physical connections for topology: %s, reference: "
             "%s", topology_id, topology_ref)

    topo_traversal: GraphTraversalSource = \
        graph_client.topology_subgraph(topology_id, topology_ref)

    # First get all logically connected pairs of vertex and their associated
    # containers and stream managers
    logical_edges: List[Dict[str, Union[Vertex, Edge]]] = (
        topo_traversal.V().hasLabel(P.within("bolt", "spout"))
        .outE("logically_connected")
        .project("source_instance", "source_container",
                 "source_stream_manager", "l_edge", "destination_instance",
                 "destination_container", "destination_stream_manager")
        .by(outV())
        .by(outV().out("is_within"))
        .by(outV().out("is_within").in_("is_within")
            .hasLabel("stream_manager"))
        .by()
        .by(inV())
        .by(inV().out("is_within"))
        .by(inV().out("is_within").in_("is_within")
            .hasLabel("stream_manager"))
        .toList())

    LOG.debug("Processing %d logical connected vertices", len(logical_edges))

    for logical_edge in logical_edges:
        source: Vertex = logical_edge["source_instance"]
        source_container: Vertex = logical_edge["source_container"]
        source_stream_manager: Vertex = logical_edge["source_stream_manager"]
        destination: Vertex = logical_edge["destination_instance"]
        destination_container: Vertex = logical_edge["destination_container"]
        destination_stream_manager: Vertex = \
            logical_edge["destination_stream_manager"]
        l_edge: Edge = logical_edge["l_edge"]

        # Connect the source instance to its stream manager, checking first
        # if the connection already exists
        (graph_client.graph_traversal.V(source)
         .coalesce(out("physically_connected").is_(source_stream_manager),
                   addE("physically_connected").to(source_stream_manager))
         .next())

        if source_container == destination_container:

            # If the source and destination instances are in the same
            # container then they share the same stream manager so just use
            # the source stream manager found above. Connect the source
            # stream manager to the destination instance

            (graph_client.graph_traversal.V(source_stream_manager)
             .coalesce(out("physically_connected").is_(destination),
                       addE("physically_connected").to(destination))
             .next())

            # Set the logical edge for this pair to "local"
            graph_client.graph_traversal.E(l_edge).property("type",
                                                            "local").next()

        else:
            # Connect the two stream managers (if they aren't already)
            (graph_client.graph_traversal.V(source_stream_manager)
             .coalesce(
                 out("physically_connected").is_(destination_stream_manager),
                 addE("physically_connected").to(destination_stream_manager))
             .next())

            (graph_client.graph_traversal.V(destination_stream_manager)
             .coalesce(out("physically_connected").is_(destination),
                       addE("physically_connected").to(destination))
             .next())

            # Set the logical edge for this pair to "remote"
            graph_client.graph_traversal.E(l_edge).property("type",
                                                            "remote").next()