def set_fields_routing_probs()

in graph/analysis/heron/routing_probabilities.py [0:0]


def set_fields_routing_probs(graph_client: GremlinClient,
                             metrics_client: HeronMetricsClient,
                             topology_id: str, topology_ref: str,
                             start: dt.datetime, end: dt.datetime) -> None:
    """ Sets the routing probabilities for fields grouped logical connections
    in physical graph with the supplied topology ID and reference. Routing
    probabilities are calculated using metrics from the defined time window.

    Arguments:
        graph_client (GremlinClient):   The client instance for the graph
                                        database.
        metrics_client (HeronMetricsClient): The client instance for metrics
                                             database.
        topology_id (str):  The topology identification string.
        topology_ref (str): The topology reference string.
        start (dt.datetime):    The UTC datetime object for the start of the
                                metrics gathering widow.
        end (dt.datetime):  The UTC datetime object for the end of the metrics
                            gathering widow.
    """

    LOG.info("Setting fields grouping routing probabilities for topology %s "
             "reference %s using metrics data from %s to %s", topology_id,
             topology_ref, start.isoformat(), end.isoformat())

    topology_traversal: GraphTraversalSource = \
        graph_client.topology_subgraph(topology_id, topology_ref)

    i_to_i_rps: pd.DataFrame = calculate_inter_instance_rps(metrics_client,
                                                            topology_id, start,
                                                            end)

    # Re-index the DataFrame to make selecting RPs faster
    i_to_i_rps.set_index(["source_task", "stream", "destination_task"],
                         inplace=True)

    # Get a list of all fields grouped connections in the physical graph
    fields_connections: List[Dict[str, Union[int, str, Edge]]] = \
        (topology_traversal.V()
         .outE("logically_connected")
         .has("grouping", "FIELDS")
         .project("source_task", "stream", "edge", "destination_task")
         .by(__.outV().properties("task_id").value())
         .by(__.properties("stream").value())
         .by()
         .by(__.inV().properties("task_id").value())
         .toList())

    LOG.debug("Processing %d fields grouped connections for topology %s "
              "reference %s", len(fields_connections), topology_id,
              topology_ref)

    connection: Dict[str, Union[int, str, Edge]]
    for connection in fields_connections:

        LOG.debug("Processing connection from instance %d to %d on stream %s",
                  connection["source_task"], connection["destination_task"],
                  connection["stream"])

        routing_prob: float = (i_to_i_rps.loc[connection["source_task"],
                                              connection["stream"],
                                              connection["destination_task"]]
                               ["routing_probability"])

        (topology_traversal.E(connection["edge"])
         .property("routing_probability", routing_prob).next())