model/traffic/heron/prophet.py (261 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains methods and classes for forecasting the traffic from spout instances in a Heron topology using Facebook's Prophet times series prediction package: https://facebook.github.io/prophet/""" import logging import datetime as dt from typing import Any, Dict, DefaultDict, Union, cast, List, Optional from collections import defaultdict import pandas as pd from fbprophet import Prophet from caladrius.common.heron import tracker from caladrius.metrics.heron.client import HeronMetricsClient from caladrius.model.traffic.heron.base import HeronTrafficModel from caladrius.graph.gremlin.client import GremlinClient LOG: logging.Logger = logging.getLogger(__name__) INSTANCE_MODELS = DefaultDict[str, DefaultDict[int, Dict[str, Prophet]]] COMPONENT_MODELS = DefaultDict[str, Dict[str, Prophet]] def get_spout_emissions(metric_client: HeronMetricsClient, tracker_url: str, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime) -> pd.DataFrame: emit_counts: pd.DataFrame = metric_client.get_emit_counts( topology_id, cluster, environ, start, end) lplan: Dict[str, Any] = tracker.get_logical_plan(tracker_url, cluster, environ, topology_id) spout_emits: pd.DataFrame = \ emit_counts[emit_counts.component.isin(lplan["spouts"].keys())] return spout_emits def build_component_models( metric_client: HeronMetricsClient, tracker_url: str, topology_id: str, cluster: str, environ: str, start: dt.datetime = None, end: dt.datetime = None, spout_emits: Optional[pd.DataFrame]=None) -> DefaultDict[str, Dict[str, Prophet]]: LOG.info("Creating traffic models for spout components of topology %s", topology_id) if start and end and spout_emits is None: spout_emits = get_spout_emissions(metric_client, tracker_url, topology_id, cluster, environ, start, end) elif spout_emits is None and ((not start and end) or (start and not end)): err: str = ("Either start and end datetime instances or the spout " "emits should be provided") LOG.error(err) raise RuntimeError(err) spout_comp_emits: pd.DataFrame = \ (spout_emits.groupby(["component", "stream", "timestamp"]) .mean()["emit_count"].reset_index()) output: DefaultDict[str, Dict[str, Prophet]] = defaultdict(dict) for (spout_comp, stream), data in spout_comp_emits.groupby(["component", "stream"]): LOG.info("Creating traffic model for spout %s stream %s", spout_comp, stream) df: pd.DataFrame = (data[["timestamp", "emit_count"]] .rename(index=str, columns={"timestamp": "ds", "emit_count": "y"})) model: Prophet = Prophet() model.fit(df) output[spout_comp][stream] = model return output def predict_per_component(metric_client: HeronMetricsClient, tracker_url: str, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, future_mins: int) -> pd.DataFrame: models: DefaultDict[str, Dict[str, Prophet]] = \ build_component_models(metric_client, tracker_url, topology_id, cluster, environ, start, end) return run_per_component(models, future_mins) def run_per_component(models: COMPONENT_MODELS, future_mins: int) -> pd.DataFrame: output: pd.DataFrame = None for spout_comp, stream_models in models.items(): for stream, model in stream_models.items(): future: pd.DataFrame = model.make_future_dataframe( periods=future_mins, freq='T', include_history=False) forecast: pd.DataFrame = model.predict(future) forecast["stream"] = stream forecast["component"] = spout_comp if output is None: output = forecast else: output = output.append(forecast) return output def build_instance_models( metric_client: HeronMetricsClient, tracker_url: str, topology_id: str, cluster: str, environ: str, start: Optional[dt.datetime] = None, end: Optional[dt.datetime] = None, spout_emits: Optional[pd.DataFrame] = None) -> INSTANCE_MODELS: if start and end and spout_emits is None: spout_emits = get_spout_emissions(metric_client, tracker_url, topology_id, cluster, environ, start, end) elif spout_emits is None and ((not start and end) or (start and not end)): err: str = ("Both start and end datetimes should be provided if spout " "emits DataFrame is not") LOG.error(err) raise RuntimeError(err) elif spout_emits is None and not start and not end: err = ("Either start and end datetimes or the spout emits should be " "provided") LOG.error(err) raise RuntimeError(err) else: spout_emits = cast(pd.DataFrame, spout_emits) spout_groups: pd.core.groupby.DataFrameGroupBy = \ (spout_emits[["component", "task", "stream", "timestamp", "emit_count"]] .groupby(["component", "task", "stream"])) output: INSTANCE_MODELS = defaultdict(lambda: defaultdict(dict)) for (spout_comp, task, stream), data in spout_groups: df: pd.DataFrame = (data[["timestamp", "emit_count"]] .rename(index=str, columns={"timestamp": "ds", "emit_count": "y"})) model: Prophet = Prophet() model.fit(df) output[spout_comp][task][stream] = model return output def run_per_instance_models(models: INSTANCE_MODELS, future_mins: int) -> pd.DataFrame: output: pd.DataFrame = None for spout_comp, task_dict in models.items(): for task, stream_models in task_dict.items(): for stream, model in stream_models.items(): future: pd.DataFrame = model.make_future_dataframe( periods=future_mins, freq='T', include_history=False) forecast: pd.DataFrame = model.predict(future) forecast["stream"] = stream forecast["task"] = task forecast["component"] = spout_comp if output is None: output = forecast else: output = output.append(forecast) return output def predict_per_instance(metric_client: HeronMetricsClient, tracker_url: str, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, future_mins: int) -> pd.DataFrame: models = build_instance_models(metric_client, tracker_url, topology_id, cluster, environ, start, end) return run_per_instance_models(models, future_mins) class ProphetTrafficModel(HeronTrafficModel): """ This model class provides traffic forecasting for Heron topologies using the Facebook Prophet time series forecasting package: https://facebook.github.io/prophet/ """ name: str = "prophet" description: str = ("This model uses the Facebook Prophet time series " "prediction package to forecast the spout traffic for " "a given amount of time into the future.") def __init__(self, config: dict, metrics_client: HeronMetricsClient, graph_client: GremlinClient) -> None: super().__init__(config, metrics_client, graph_client) self.metrics_client: HeronMetricsClient if "heron.tracker.url" in config: self.tracker_url: str = \ config["heron.tracker.url"] else: tracker_msg: str = ( "The Heron Tracker URL was not supplied in the configuration " "dictionary given to the Prophet traffic model. Please check " "the keys under the 'heron.traffic.model.config' in the main " "configuration file") LOG.error(tracker_msg) raise RuntimeError(tracker_msg) if "prophet.model.default.source.hours" in config: self.default_source_hours: float = \ config["prophet.model.default.source.hours"] else: self.default_source_hours = 24 LOG.warning("Default source hours were not supplied via " "configuration file. Setting to %d hours.", self.default_source_hours) if "prophet.model.default.future.mins" in config: self.default_future_minutes: int = \ config["prophet.model.default.future.mins"] else: self.default_future_minutes = 30 LOG.warning("Default future minutes parameter was not supplied " "via configuration file. Setting to %d minutes.", self.default_future_minutes) if "prophet.model.default.metrics_sample_period" in config: self.default_metrics_sample_period: int = \ config["prophet.model.default.metrics_sample_period"] else: self.default_metrics_sample_period = 60 LOG.warning("Default metrics sample period parameter was not supplied " "via configuration file. Setting to %d minutes.", self.default_metrics_sample_period) if "prophet.model.quantiles" in config: self.quantiles: List[int] = config["prophet.model.quantiles"] else: LOG.warning("Quantile values were not set via configuration file " " using: 10, 90, 95, 99 as defaults.") self.quantiles = [10, 90, 95, 99] def predict_traffic(self, topology_id: str, cluster: str, environ: str, **kwargs: Union[str, int, float]) -> Dict[str, Any]: if "source_hours" not in kwargs: LOG.warning("source_hours parameter (indicating how many hours of " "historical data to summarise) was not provided, " "using default value of %d hours", self.default_source_hours) source_hours: float = self.default_source_hours else: source_hours = cast(float, float(kwargs["source_hours"])) source_end: dt.datetime = \ dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) source_start: dt.datetime = (source_end - dt.timedelta(hours=source_hours)) if "future_mins" not in kwargs: LOG.warning("future_mins parameter (indicating how many minutes " "into the future traffic should be predicted was not " "provided, using default value of %d minutes", self.default_future_minutes) future_mins: int = self.default_future_minutes else: future_mins = cast(int, int(kwargs["future_mins"])) LOG.info("Predicting traffic over the next %d minutes for topology %s " "using a Prophet model trained on metrics from a %f hour " "period from %s to %s", future_mins, topology_id, (source_end-source_start).total_seconds() / 3600, source_start.isoformat(), source_end.isoformat()) if "metrics_sample_period" in kwargs: time_period_sec: float = \ cast(float, float(kwargs["metrics_sample_period"])) else: LOG.warning("metrics_sample_period (indicating the period of time the metrics client retrieves metrics for)" " was not supplied. Using default value of %d seconds.", self.default_metrics_sample_period) time_period_sec: int = self.default_metrics_sample_period output: Dict[str, Any] = {} # Per component predictions component_traffic: pd.DataFrame = predict_per_component( self.metrics_client, self.tracker_url, topology_id, cluster, environ, source_start, source_end, future_mins) traffic_by_component: pd.core.groupby.DataFrameGroupBy = \ component_traffic.groupby(["component", "stream"]) components: DefaultDict[str, DefaultDict[str, Dict[str, float]]] = \ defaultdict(lambda: defaultdict(dict)) for (spout_component, stream), data in traffic_by_component: components["mean"][spout_component][stream] = \ (float(data.yhat.mean()) / time_period_sec) components["median"][spout_component][stream] = \ (float(data.yhat.median()) / time_period_sec) components["max"][spout_component][stream] = \ (float(data.yhat.max()) / time_period_sec) components["min"][spout_component][stream] = \ (float(data.yhat.min()) / time_period_sec) for quantile in self.quantiles: components[f"{quantile}-quantile"][spout_component][stream] = \ (float(data.yhat.quantile(quantile/100)) / time_period_sec) output["components"] = components # Per instance predictions instance_traffic: pd.DataFrame = predict_per_instance( self.metrics_client, self.tracker_url, topology_id, cluster, environ, source_start, source_end, future_mins) traffic_by_task: pd.core.groupby.DataFrameGroupBy = \ instance_traffic.groupby(["task", "stream"]) instances: DefaultDict[str, DefaultDict[str, Dict[str, float]]] = \ defaultdict(lambda: defaultdict(dict)) for (task_id, stream), data in traffic_by_task: instances["mean"][str(task_id)][stream] = \ (float(data.yhat.mean()) / time_period_sec) instances["median"][str(task_id)][stream] = \ (float(data.yhat.median()) / time_period_sec) instances["max"][str(task_id)][stream] = \ (float(data.yhat.max()) / time_period_sec) instances["min"][str(task_id)][stream] = \ (float(data.yhat.min()) / time_period_sec) for quantile in self.quantiles: instances[f"{quantile}-quantile"][str(task_id)][stream] = \ (float(data.yhat.quantile(quantile/100)) / time_period_sec) output["instances"] = instances return output