in model/traffic/heron/prophet.py [0:0]
def predict_traffic(self, topology_id: str, cluster: str, environ: str,
**kwargs: Union[str, int, float]) -> Dict[str, Any]:
if "source_hours" not in kwargs:
LOG.warning("source_hours parameter (indicating how many hours of "
"historical data to summarise) was not provided, "
"using default value of %d hours",
self.default_source_hours)
source_hours: float = self.default_source_hours
else:
source_hours = cast(float, float(kwargs["source_hours"]))
source_end: dt.datetime = \
dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
source_start: dt.datetime = (source_end -
dt.timedelta(hours=source_hours))
if "future_mins" not in kwargs:
LOG.warning("future_mins parameter (indicating how many minutes "
"into the future traffic should be predicted was not "
"provided, using default value of %d minutes",
self.default_future_minutes)
future_mins: int = self.default_future_minutes
else:
future_mins = cast(int, int(kwargs["future_mins"]))
LOG.info("Predicting traffic over the next %d minutes for topology %s "
"using a Prophet model trained on metrics from a %f hour "
"period from %s to %s", future_mins, topology_id,
(source_end-source_start).total_seconds() / 3600,
source_start.isoformat(), source_end.isoformat())
if "metrics_sample_period" in kwargs:
time_period_sec: float = \
cast(float, float(kwargs["metrics_sample_period"]))
else:
LOG.warning("metrics_sample_period (indicating the period of time the metrics client retrieves metrics for)"
" was not supplied. Using default value of %d seconds.", self.default_metrics_sample_period)
time_period_sec: int = self.default_metrics_sample_period
output: Dict[str, Any] = {}
# Per component predictions
component_traffic: pd.DataFrame = predict_per_component(
self.metrics_client, self.tracker_url, topology_id,
cluster, environ, source_start, source_end, future_mins)
traffic_by_component: pd.core.groupby.DataFrameGroupBy = \
component_traffic.groupby(["component", "stream"])
components: DefaultDict[str, DefaultDict[str, Dict[str, float]]] = \
defaultdict(lambda: defaultdict(dict))
for (spout_component, stream), data in traffic_by_component:
components["mean"][spout_component][stream] = \
(float(data.yhat.mean()) / time_period_sec)
components["median"][spout_component][stream] = \
(float(data.yhat.median()) / time_period_sec)
components["max"][spout_component][stream] = \
(float(data.yhat.max()) / time_period_sec)
components["min"][spout_component][stream] = \
(float(data.yhat.min()) / time_period_sec)
for quantile in self.quantiles:
components[f"{quantile}-quantile"][spout_component][stream] = \
(float(data.yhat.quantile(quantile/100)) /
time_period_sec)
output["components"] = components
# Per instance predictions
instance_traffic: pd.DataFrame = predict_per_instance(
self.metrics_client, self.tracker_url, topology_id, cluster,
environ, source_start, source_end, future_mins)
traffic_by_task: pd.core.groupby.DataFrameGroupBy = \
instance_traffic.groupby(["task", "stream"])
instances: DefaultDict[str, DefaultDict[str, Dict[str, float]]] = \
defaultdict(lambda: defaultdict(dict))
for (task_id, stream), data in traffic_by_task:
instances["mean"][str(task_id)][stream] = \
(float(data.yhat.mean()) / time_period_sec)
instances["median"][str(task_id)][stream] = \
(float(data.yhat.median()) / time_period_sec)
instances["max"][str(task_id)][stream] = \
(float(data.yhat.max()) / time_period_sec)
instances["min"][str(task_id)][stream] = \
(float(data.yhat.min()) / time_period_sec)
for quantile in self.quantiles:
instances[f"{quantile}-quantile"][str(task_id)][stream] = \
(float(data.yhat.quantile(quantile/100)) /
time_period_sec)
output["instances"] = instances
return output