in model/topology/heron/queueing_theory.py [0:0]
def predict_current_performance(
self, topology_id: str, cluster: str, environ: str,
spout_traffic: Dict[int, Dict[str, float]],
**kwargs: Any) -> pd.DataFrame:
"""
Arguments:
topology_id (str): The topology identification string
spout_traffic (dict): The expected output of the spout instances.
These emit values should be in tuples per
second (tps) otherwise they will not match
with the service time measurements.
"""
# TODO: check spout traffic keys are integers!
start, end = get_start_end_times(**kwargs)
metric_bucket_length: int = cast(int,
self.config["metric.bucket.length"])
LOG.info("Predicting traffic levels and backpressure of currently running "
"topology %s using queueing theory model", topology_id)
# Remove the start and end time kwargs so we don't supply them twice to
# the metrics client.
# TODO: We need to make this cleaner? Add start and end to topo model?
other_kwargs: Dict[str, Any] = {key: value
for key, value in kwargs.items()
if key not in ["start", "end"]}
# Get the service time for all elements
service_times: pd.DataFrame = self.metrics_client.get_service_times(
topology_id, cluster, environ, start, end, **other_kwargs)
if service_times.empty:
raise Exception("Metric client returned empty data frame for service times.")
# Calculate the service rate for each instance
service_times["tuples_per_sec"] = 1.0 / (service_times["latency_ms"] /
1000.0)
# Drop the system streams
service_times = (service_times[~service_times["stream"]
.str.contains("__")])
# Calculate the median service time and rate
service_time_summary: pd.DataFrame = \
(service_times[["task", "stream", "latency_ms", "tuples_per_sec"]]
.groupby(["task", "stream"]).median().reset_index())
# Get the reference of the latest physical graph entry for this
# topology, or create a physical graph if there are non.
topology_ref: str = graph_check(self.graph_client, self.config,
self.tracker_url, cluster, environ,
topology_id)
# Predict the arrival rate at all instances with the supplied spout
# traffic
in_ars, strmgr_ars = self.predict_arrival_rates(
topology_id, cluster, environ, spout_traffic, start, end,
metric_bucket_length, topology_ref)
combined: pd.DataFrame = service_time_summary.merge(
in_ars, on=["task", "stream"])
combined["capacity"] = (combined["arrival_rate"] /
combined["tuples_per_sec"]) * 100.0
combined["back_pressure"] = combined["capacity"] > 100.0
return combined