in graph/analysis/heron/arrival_rates.py [0:0]
def _setup_arrival_calcs(metrics_client: HeronMetricsClient,
graph_client: GremlinClient,
topology_id: str, cluster: str, environ: str,
topology_ref: str, start: dt.datetime,
end: dt.datetime, io_bucket_length: int,
tracker_url: str, **kwargs: Union[str, int, float]
) -> Tuple[pd.DataFrame, List[List[Vertex]],
pd.DataFrame, Dict[Vertex, List[int]],
Dict[Vertex, List[int]]]:
""" Helper method which sets up the data needed for the arrival rate
calculations. This is a separate cached method as these data are not
effected by the traffic (spout_state) and so do not need to be recalculated
for a new traffic level for the same topology id/ref. """
topo_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
# Calculate the routing probabilities for the defined metric gathering
# period
i2i_rps: pd.Series = (calculate_inter_instance_rps(
metrics_client, topology_id, cluster, environ, start, end, tracker_url,
**kwargs).set_index(["source_task", "destination_task", "stream"])
["routing_probability"])
# Get the vertex levels for the logical graph tree
LOG.info("Calculating levels for topology %s reference %s", topology_id,
topology_ref)
levels: List[List[Vertex]] = get_levels(topo_traversal)
LOG.debug("Found %d levels is topology %s reference %s", len(levels),
topology_id, topology_ref)
# Calculate the input output ratios for each instances using data from the
# defined metrics gathering period
coefficients: pd.Series = lstsq_io_ratios(
metrics_client, graph_client, topology_id, cluster, environ, start,
end, io_bucket_length, **kwargs).set_index(["task", "output_stream",
"input_stream",
"source_component"]
)["coefficient"]
# Get the details of the incoming and outgoing physical connections for
# stream manager in the topology
# Get a dictionary mapping from stream manager id string to a list of the
# instances (within each container) that will send tuples to each stream
# manager
sending_instances: Dict[Vertex, List[int]] = \
(topo_traversal.V().hasLabel("stream_manager")
.group().by("id").by(in_("physically_connected")
.hasLabel(P.within("spout", "bolt"))
.values("task_id")
.fold())
.next())
# Get a dictionary mapping from stream manager id string to a list of the
# instances (within each container) that will receive tuples from each
# stream manager
receiving_instances: Dict[Vertex, List[int]] = \
(topo_traversal.V().hasLabel("stream_manager")
.group().by("id").by(out("physically_connected")
.hasLabel("bolt").values("task_id").fold())
.next())
return (i2i_rps, levels, coefficients, sending_instances,
receiving_instances)