in graph/analysis/heron/routing_probabilities.py [0:0]
def set_fields_routing_probs(graph_client: GremlinClient,
metrics_client: HeronMetricsClient,
topology_id: str, topology_ref: str,
start: dt.datetime, end: dt.datetime) -> None:
""" Sets the routing probabilities for fields grouped logical connections
in physical graph with the supplied topology ID and reference. Routing
probabilities are calculated using metrics from the defined time window.
Arguments:
graph_client (GremlinClient): The client instance for the graph
database.
metrics_client (HeronMetricsClient): The client instance for metrics
database.
topology_id (str): The topology identification string.
topology_ref (str): The topology reference string.
start (dt.datetime): The UTC datetime object for the start of the
metrics gathering widow.
end (dt.datetime): The UTC datetime object for the end of the metrics
gathering widow.
"""
LOG.info("Setting fields grouping routing probabilities for topology %s "
"reference %s using metrics data from %s to %s", topology_id,
topology_ref, start.isoformat(), end.isoformat())
topology_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
i_to_i_rps: pd.DataFrame = calculate_inter_instance_rps(metrics_client,
topology_id, start,
end)
# Re-index the DataFrame to make selecting RPs faster
i_to_i_rps.set_index(["source_task", "stream", "destination_task"],
inplace=True)
# Get a list of all fields grouped connections in the physical graph
fields_connections: List[Dict[str, Union[int, str, Edge]]] = \
(topology_traversal.V()
.outE("logically_connected")
.has("grouping", "FIELDS")
.project("source_task", "stream", "edge", "destination_task")
.by(__.outV().properties("task_id").value())
.by(__.properties("stream").value())
.by()
.by(__.inV().properties("task_id").value())
.toList())
LOG.debug("Processing %d fields grouped connections for topology %s "
"reference %s", len(fields_connections), topology_id,
topology_ref)
connection: Dict[str, Union[int, str, Edge]]
for connection in fields_connections:
LOG.debug("Processing connection from instance %d to %d on stream %s",
connection["source_task"], connection["destination_task"],
connection["stream"])
routing_prob: float = (i_to_i_rps.loc[connection["source_task"],
connection["stream"],
connection["destination_task"]]
["routing_probability"])
(topology_traversal.E(connection["edge"])
.property("routing_probability", routing_prob).next())