def predict_traffic()

in model/traffic/heron/prophet.py [0:0]


    def predict_traffic(self, topology_id: str, cluster: str, environ: str,
                        **kwargs: Union[str, int, float]) -> Dict[str, Any]:

        if "source_hours" not in kwargs:
            LOG.warning("source_hours parameter (indicating how many hours of "
                        "historical data to summarise) was not provided, "
                        "using default value of %d hours",
                        self.default_source_hours)
            source_hours: float = self.default_source_hours
        else:
            source_hours = cast(float, float(kwargs["source_hours"]))

        source_end: dt.datetime = \
            dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
        source_start: dt.datetime = (source_end -
                                     dt.timedelta(hours=source_hours))

        if "future_mins" not in kwargs:
            LOG.warning("future_mins parameter (indicating how many minutes "
                        "into the future traffic should be predicted was not "
                        "provided, using default value of %d minutes",
                        self.default_future_minutes)
            future_mins: int = self.default_future_minutes
        else:
            future_mins = cast(int, int(kwargs["future_mins"]))

        LOG.info("Predicting traffic over the next %d minutes for topology %s "
                 "using a Prophet model trained on metrics from a %f hour "
                 "period from %s to %s", future_mins, topology_id,
                 (source_end-source_start).total_seconds() / 3600,
                 source_start.isoformat(), source_end.isoformat())

        if "metrics_sample_period" in kwargs:
            time_period_sec: float = \
                cast(float, float(kwargs["metrics_sample_period"]))
        else:
            LOG.warning("metrics_sample_period (indicating the period of time the metrics client retrieves metrics for)"
                        " was not supplied. Using default value of %d seconds.", self.default_metrics_sample_period)
            time_period_sec: int = self.default_metrics_sample_period

        output: Dict[str, Any] = {}

        # Per component predictions

        component_traffic: pd.DataFrame = predict_per_component(
            self.metrics_client, self.tracker_url, topology_id,
            cluster, environ, source_start, source_end, future_mins)

        traffic_by_component: pd.core.groupby.DataFrameGroupBy = \
            component_traffic.groupby(["component", "stream"])

        components: DefaultDict[str, DefaultDict[str, Dict[str, float]]] = \
            defaultdict(lambda: defaultdict(dict))

        for (spout_component, stream), data in traffic_by_component:

            components["mean"][spout_component][stream] = \
                (float(data.yhat.mean()) / time_period_sec)

            components["median"][spout_component][stream] = \
                (float(data.yhat.median()) / time_period_sec)

            components["max"][spout_component][stream] = \
                (float(data.yhat.max()) / time_period_sec)

            components["min"][spout_component][stream] = \
                (float(data.yhat.min()) / time_period_sec)

            for quantile in self.quantiles:
                components[f"{quantile}-quantile"][spout_component][stream] = \
                    (float(data.yhat.quantile(quantile/100)) /
                     time_period_sec)

        output["components"] = components

        # Per instance predictions

        instance_traffic: pd.DataFrame = predict_per_instance(
            self.metrics_client, self.tracker_url, topology_id, cluster,
            environ, source_start, source_end, future_mins)

        traffic_by_task: pd.core.groupby.DataFrameGroupBy = \
            instance_traffic.groupby(["task", "stream"])

        instances: DefaultDict[str, DefaultDict[str, Dict[str, float]]] = \
            defaultdict(lambda: defaultdict(dict))

        for (task_id, stream), data in traffic_by_task:

            instances["mean"][str(task_id)][stream] = \
                (float(data.yhat.mean()) / time_period_sec)

            instances["median"][str(task_id)][stream] = \
                (float(data.yhat.median()) / time_period_sec)

            instances["max"][str(task_id)][stream] = \
                (float(data.yhat.max()) / time_period_sec)

            instances["min"][str(task_id)][stream] = \
                (float(data.yhat.min()) / time_period_sec)

            for quantile in self.quantiles:
                instances[f"{quantile}-quantile"][str(task_id)][stream] = \
                    (float(data.yhat.quantile(quantile/100)) /
                     time_period_sec)

        output["instances"] = instances
        return output