in graph/analysis/heron/arrival_rates.py [0:0]
def calculate(graph_client: GremlinClient, metrics_client: HeronMetricsClient,
topology_id: str, cluster: str, environ: str, topology_ref: str,
start: dt.datetime, end: dt.datetime, io_bucket_length: int,
tracker_url: str, spout_state: Dict[int, Dict[str, float]],
**kwargs: Union[str, int, float]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Arguments:
graph_client (GremlinClient): The client instance for the graph
database.
metrics_client (HeronMetricsClient): The client instance for the
metrics database.
topology_id (str): The topology identification string.
cluster: (str): The cluster the topology is running on.
environ (str): The environment the topology is running in.
topology_ref (str): The reference string for the topology physical
graph to be used in the calculations.
start (dt.datetime): The UTC datetime instance representing the
start of the metric gathering window.
end (dt.datetime): The UTC datetime instance representing the end of
the metric gathering window.
io_bucket_length (int): The length in seconds that metrics should be
aggregated for use in IO ratio calculations.
tracker_url (str): The URL for the Heron Tracker API
spout_state (dict): A dictionary mapping from instance task id to a
dictionary that maps from output stream name to the
output rate for that spout instance. The units of
this rate (TPS, TPM etc) will be the same for the
arrival rates.
**kwargs: Any additional key word arguments required by the metrics
client query methods. NOTE: This is passed to a cached
method so all kwargs must be hashable. Un-hashable
arguments will be removed before being supplied.
Returns:
pd.DataFrame: A DataFrame containing the arrival rate at each
instance.
pd.DataFrame: A DataFrame containing the input and output rate of
each stream manager.
Raises:
RuntimeError: If there is no entry in the graph database for the
supplied topology id and ref.
"""
# First check that there is a physical graph for the supplied reference in
# the graph database
graph_client.raise_if_missing(topology_id, topology_ref)
LOG.info("Calculating arrival rates for topology %s reference %s using "
"metrics from a %d second period from %s to %s", topology_id,
topology_ref, (end-start).total_seconds(), start.isoformat(),
end.isoformat())
i2i_rps, levels, coefficients, sending_instances, receiving_instances = \
_setup_arrival_calcs(metrics_client, graph_client, topology_id,
cluster, environ, topology_ref, start, end,
io_bucket_length, tracker_url, **kwargs)
topo_traversal: GraphTraversalSource = \
graph_client.topology_subgraph(topology_id, topology_ref)
arrival_rates: ARRIVAL_RATES = defaultdict(lambda: defaultdict(float))
output_rates: OUTPUT_RATES = defaultdict(dict)
output_rates.update(spout_state)
# Step through the tree levels and calculate the output from each level and
# the arrivals at the next. Skip the final level as its arrival rates are
# calculated in the previous step and it has no outputs.
for level_number, level in enumerate(levels[:-1]):
LOG.debug("Processing topology level %d", level_number)
if level_number != 0:
# If this is not a spout level then we need to calculate the output
# from the instances in this level.
for source_vertex in level:
output_rates = _calculate_outputs(topo_traversal,
source_vertex, arrival_rates,
output_rates, coefficients)
# Calculate the arrival rates at the instances down stream on the next
# level down
for source_vertex in level:
arrival_rates = _calculate_arrivals(topo_traversal, source_vertex,
arrival_rates, output_rates,
i2i_rps)
# At this stage we have the output and arrival amount for all logically
# connected elements. We now need to map these on to the stream managers to
# calculate their incoming and outgoing tuple rates.
strmgr_in_out: pd.DataFrame = _calc_strmgr_in_out(
sending_instances, receiving_instances, output_rates, arrival_rates)
return _convert_arrs_to_df(arrival_rates), strmgr_in_out