graph/analysis/heron/routing_probabilities.py (80 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module contains methods for calculating the routing probabilities of
heron topologies. """
import logging
import datetime as dt
from typing import List, Dict, Union
import pandas as pd
from gremlin_python.process.traversal import P
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.graph_traversal import GraphTraversalSource
from gremlin_python.structure.graph import Edge
from caladrius.metrics.heron.client import HeronMetricsClient
from caladrius.graph.gremlin.client import GremlinClient
from caladrius.metrics.heron.topology.routing_probabilities \
import calculate_inter_instance_rps
LOG: logging.Logger = logging.getLogger(__name__)
def get_comp_links_by_grouping(
graph_traversal: GraphTraversalSource, grouping: str
) -> List[Dict[str, str]]:
""" Gets a list of component connection dictionaries. These describe all
source->stream->destination connections with the specified grouping value
in the topology available via the supplied graph traversal source.
Arguments:
graph_traversal (GraphTraversalSource): A GraphTraversalSource instance
linked to the topology subgraph
whose connections are to be
queried.
grouping (str): The stream grouping of the connections to be returned.
Returns:
A list of dictionaries each containing "source", "stream" and
"destination" keys of the component and stream name respectively.
"""
component_connections: List[Dict[str, str]] = \
(graph_traversal.V().hasLabel(P.within("bolt", "spout")).as_("source")
.outE("logically_connected").has("grouping", grouping).as_("stream")
.inV().as_("destination").select("source", "stream", "destination")
.by("component").by("stream").by("component").dedup().toList())
return component_connections
def set_shuffle_routing_probs(graph_client: GremlinClient,
topology_id: str, topology_ref: str) -> None:
""" This method will set the routing probability for shuffle connections in
the graph with the supplied topology ID and reference.
Arguments:
graph_client (GremlinClient): The client instance for the graph
database.
topology_id (str): The topology identification string.
topology_ref (str): The topology reference string.
"""
LOG.info("Calculating routing probabilities for shuffle grouped logical "
"connections in the graph of topology %s reference %s",
topology_id, topology_ref)
topology_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
for comp_conn in get_comp_links_by_grouping(topology_traversal, "SHUFFLE"):
LOG.debug("Calculating routing probabilities for logical connections "
"between instances of %s and %s on the %s stream",
comp_conn["source"], comp_conn["destination"],
comp_conn["stream"])
# Calculate the shuffle grouped connections routing probability based
# on the number of downstream instances for this connections
shuffle_rp: float = (topology_traversal.V()
.has("component", comp_conn["destination"])
.count().math("1/_").next())
# Apply the calculated routing probability to all logical connections
# with this stream name between the source and destination instances
# of the these components
(topology_traversal.V().has("component", comp_conn["source"])
.outE("logically_connected").has("stream", comp_conn["stream"])
.property("routing_probability", shuffle_rp)
.inV().has("component", comp_conn["destination"])
.iterate())
def set_fields_routing_probs(graph_client: GremlinClient,
metrics_client: HeronMetricsClient,
topology_id: str, topology_ref: str,
start: dt.datetime, end: dt.datetime) -> None:
""" Sets the routing probabilities for fields grouped logical connections
in physical graph with the supplied topology ID and reference. Routing
probabilities are calculated using metrics from the defined time window.
Arguments:
graph_client (GremlinClient): The client instance for the graph
database.
metrics_client (HeronMetricsClient): The client instance for metrics
database.
topology_id (str): The topology identification string.
topology_ref (str): The topology reference string.
start (dt.datetime): The UTC datetime object for the start of the
metrics gathering widow.
end (dt.datetime): The UTC datetime object for the end of the metrics
gathering widow.
"""
LOG.info("Setting fields grouping routing probabilities for topology %s "
"reference %s using metrics data from %s to %s", topology_id,
topology_ref, start.isoformat(), end.isoformat())
topology_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
i_to_i_rps: pd.DataFrame = calculate_inter_instance_rps(metrics_client,
topology_id, start,
end)
# Re-index the DataFrame to make selecting RPs faster
i_to_i_rps.set_index(["source_task", "stream", "destination_task"],
inplace=True)
# Get a list of all fields grouped connections in the physical graph
fields_connections: List[Dict[str, Union[int, str, Edge]]] = \
(topology_traversal.V()
.outE("logically_connected")
.has("grouping", "FIELDS")
.project("source_task", "stream", "edge", "destination_task")
.by(__.outV().properties("task_id").value())
.by(__.properties("stream").value())
.by()
.by(__.inV().properties("task_id").value())
.toList())
LOG.debug("Processing %d fields grouped connections for topology %s "
"reference %s", len(fields_connections), topology_id,
topology_ref)
connection: Dict[str, Union[int, str, Edge]]
for connection in fields_connections:
LOG.debug("Processing connection from instance %d to %d on stream %s",
connection["source_task"], connection["destination_task"],
connection["stream"])
routing_prob: float = (i_to_i_rps.loc[connection["source_task"],
connection["stream"],
connection["destination_task"]]
["routing_probability"])
(topology_traversal.E(connection["edge"])
.property("routing_probability", routing_prob).next())