in model/traffic/heron/stats_summary.py [0:0]
def predict_traffic(self, topology_id: str, cluster: str, environ: str,
**kwargs: Union[str, int, float]) -> Dict[str, Any]:
""" This method will provide a summary of the emit counts from the
spout instances of the specified topology. It will summarise the emit
metrics over the number of hours defined by the source_hours keyword
argument and provide summary statistics (mean, median, min, max and
quantiles) over all instances of each component and for each individual
instance.
Arguments:
topology_id (str): The topology ID string
**source_hours (int): Optional keyword argument for the number of
hours (backwards from now) of metrics data
to summarise.
**metric_sample_period (float): This is an optional argument
specifying the period (seconds) of
the metrics returned by the metrics
client.
Returns:
dict: A dictionary with top level keys for "components" which
links to summary statistics for each spout instance and "instances"
with summary statistics for each individual instance of each spout
component.
The dictionary has the form:
["components"]
[statistic_name]
[component_name]
[output_stream_name] = emit_count
["instances"]
[statistic_name]
[task_id]
[output_stream_name] = emit_count
All dictionary keys are stings to allow easy conversion to JSON.
"""
if "source_hours" not in kwargs:
LOG.warning("source_hours parameter (indicating how many hours of "
"historical data to summarise) was not provided, "
"using default value of %d hours",
self.default_source_hours)
source_hours: float = self.default_source_hours
else:
source_hours = cast(float, float(kwargs["source_hours"]))
LOG.info("Predicting traffic for topology %s using statistics summary "
"model", topology_id)
end: dt.datetime = dt.datetime.now(dt.timezone.utc)
start: dt.datetime = end - dt.timedelta(hours=source_hours)
spout_comps: List[str] = (self.graph_client.graph_traversal.V()
.has("topology_id", topology_id)
.hasLabel("spout").values("component")
.dedup().toList())
emit_counts: pd.DataFrame = self.metrics_client.get_emit_counts(
topology_id, cluster, environ, start, end, **kwargs)
spout_emit_counts: pd.DataFrame = emit_counts[
emit_counts["component"].isin(spout_comps)]
if "metrics_sample_period" in kwargs:
time_period_sec: float = \
cast(float, float(kwargs["metrics_sample_period"]))
else:
# TODO: This method needs to be made robust, interleaved unique
# timestamps will return entirely the wrong period!!!
# TODO: Maybe introduce aggregation time bucket into metric client
# methods to create known time period.
# TODO: Or just accept the timer frequency of the metrics as an
# argument? For TMaster it is always 1 min and for others this can
# be supplied in the kwargs passed to the metrics get method so it
# will be available.
time_period_sec: float = \
calculate_ts_period(spout_emit_counts.timestamp)
LOG.info("Emit count data was calculated to have a period of %f "
"seconds", time_period_sec)
output: Dict[str, Any] = {}
output["details"] = {"start": start.isoformat(),
"end": end.isoformat(),
"source_hours": source_hours,
"original_metric_frequency_secs":
time_period_sec}
components: DefaultDict[str, DefaultDict[str, SUMMARY_DICT]] = \
defaultdict(lambda: defaultdict(dict))
for (comp, stream), comp_data in \
spout_emit_counts.groupby(["component", "stream"]):
LOG.debug("Processing component: %s stream: %s", comp, stream)
components["mean"][comp][stream] = \
(float(comp_data.emit_count.mean()) / time_period_sec)
components["median"][comp][stream] = \
(float(comp_data.emit_count.median()) / time_period_sec)
components["max"][comp][stream] = \
(float(comp_data.emit_count.max()) / time_period_sec)
components["min"][comp][stream] = \
(float(comp_data.emit_count.min()) / time_period_sec)
for quantile in self.quantiles:
components[f"{quantile}-quantile"][comp][stream] = \
(float(comp_data.emit_count.quantile(quantile/100)) /
time_period_sec)
output["components"] = components
instances: DefaultDict[str, DefaultDict[str, SUMMARY_DICT]] = \
defaultdict(lambda: defaultdict(dict))
for (task_id, stream), task_data in \
spout_emit_counts.groupby(["task", "stream"]):
LOG.debug("Processing instance: %d stream: %s", task_id, stream)
instances["mean"][str(task_id)][stream] = \
(float(task_data.emit_count.mean()) / time_period_sec)
instances["median"][str(task_id)][stream] = \
(float(task_data.emit_count.median()) / time_period_sec)
instances["max"][str(task_id)][stream] = \
(float(task_data.emit_count.max()) / time_period_sec)
instances["min"][str(task_id)][stream] = \
(float(task_data.emit_count.min()) / time_period_sec)
for quantile in self.quantiles:
instances[f"{quantile}-quantile"][str(task_id)][stream] = \
(float(task_data.emit_count.quantile(quantile/100)) /
time_period_sec)
output["instances"] = instances
return output