in model/topology/heron/queueing_theory.py [0:0]
def predict_arrival_rates(self, topology_id: str,
cluster: str, environ: str,
spout_traffic: Dict[int, Dict[str, float]],
start: dt.datetime, end: dt.datetime,
metric_bucket_length: int,
topology_ref: str = None, **kwargs: Any
) -> Tuple[pd.DataFrame, pd.DataFrame]:
if not topology_ref:
# Get the reference of the latest physical graph entry for this
# topology, or create a physical graph if there are non.
topology_ref = graph_check(self.graph_client, self.config,
self.tracker_url, cluster, environ,
topology_id)
# Predict Arrival Rates for all elements
instance_ars: pd.DataFrame
strmgr_ars: pd.DataFrame
instance_ars, strmgr_ars = \
arrival_rates.calculate(
self.graph_client, self.metrics_client, topology_id, cluster,
environ, topology_ref, start, end, metric_bucket_length,
self.tracker_url, spout_traffic, **kwargs)
# Sum the arrivals from each source component of each incoming stream
instance_ars.groupby(["task", "incoming_stream"]).sum()
in_ars: pd.DataFrame = \
(instance_ars.groupby(["task", "incoming_stream"]).sum()
.reset_index().rename(index=str,
columns={"incoming_stream": "stream"}))
return in_ars, strmgr_ars