def predict_traffic()

in model/traffic/heron/stats_summary.py [0:0]


    def predict_traffic(self, topology_id: str, cluster: str, environ: str,
                        **kwargs: Union[str, int, float]) -> Dict[str, Any]:
        """ This method will provide a summary of the emit counts from the
        spout instances of the specified topology. It will summarise the emit
        metrics over the number of hours defined by the source_hours keyword
        argument and provide summary statistics (mean, median, min, max and
        quantiles) over all instances of each component and for each individual
        instance.

        Arguments:
            topology_id (str):  The topology ID string
            **source_hours (int):   Optional keyword argument for the number of
                                    hours (backwards from now) of metrics data
                                    to summarise.
            **metric_sample_period (float): This is an optional argument
                                            specifying the period (seconds) of
                                            the metrics returned by the metrics
                                            client.
        Returns:
            dict:   A dictionary with top level keys for "components" which
            links to summary statistics for each spout instance and "instances"
            with summary statistics for each individual instance of each spout
            component.

            The dictionary has the form:
                ["components"]
                    [statistic_name]
                        [component_name]
                            [output_stream_name] = emit_count

                ["instances"]
                    [statistic_name]
                        [task_id]
                            [output_stream_name] = emit_count

            All dictionary keys are stings to allow easy conversion to JSON.
        """

        if "source_hours" not in kwargs:
            LOG.warning("source_hours parameter (indicating how many hours of "
                        "historical data to summarise) was not provided, "
                        "using default value of %d hours",
                        self.default_source_hours)
            source_hours: float = self.default_source_hours
        else:
            source_hours = cast(float, float(kwargs["source_hours"]))

        LOG.info("Predicting traffic for topology %s using statistics summary "
                 "model", topology_id)

        end: dt.datetime = dt.datetime.now(dt.timezone.utc)
        start: dt.datetime = end - dt.timedelta(hours=source_hours)

        spout_comps: List[str] = (self.graph_client.graph_traversal.V()
                                  .has("topology_id", topology_id)
                                  .hasLabel("spout").values("component")
                                  .dedup().toList())

        emit_counts: pd.DataFrame = self.metrics_client.get_emit_counts(
            topology_id, cluster, environ, start, end, **kwargs)

        spout_emit_counts: pd.DataFrame = emit_counts[
            emit_counts["component"].isin(spout_comps)]

        if "metrics_sample_period" in kwargs:
            time_period_sec: float = \
                cast(float, float(kwargs["metrics_sample_period"]))
        else:
            # TODO: This method needs to be made robust, interleaved unique
            # timestamps will return entirely the wrong period!!!
            # TODO: Maybe introduce aggregation time bucket into metric client
            # methods to create known time period.
            # TODO: Or just accept the timer frequency of the metrics as an
            # argument? For TMaster it is always 1 min and for others this can
            # be supplied in the kwargs passed to the metrics get method so it
            # will be available.
            time_period_sec: float = \
                    calculate_ts_period(spout_emit_counts.timestamp)
            LOG.info("Emit count data was calculated to have a period of %f "
                     "seconds", time_period_sec)

        output: Dict[str, Any] = {}

        output["details"] = {"start": start.isoformat(),
                             "end": end.isoformat(),
                             "source_hours": source_hours,
                             "original_metric_frequency_secs":
                             time_period_sec}

        components: DefaultDict[str, DefaultDict[str, SUMMARY_DICT]] = \
            defaultdict(lambda: defaultdict(dict))

        for (comp, stream), comp_data in \
                spout_emit_counts.groupby(["component", "stream"]):

            LOG.debug("Processing component: %s stream: %s", comp, stream)

            components["mean"][comp][stream] = \
                (float(comp_data.emit_count.mean()) / time_period_sec)

            components["median"][comp][stream] = \
                (float(comp_data.emit_count.median()) / time_period_sec)

            components["max"][comp][stream] = \
                (float(comp_data.emit_count.max()) / time_period_sec)

            components["min"][comp][stream] = \
                (float(comp_data.emit_count.min()) / time_period_sec)

            for quantile in self.quantiles:
                components[f"{quantile}-quantile"][comp][stream] = \
                    (float(comp_data.emit_count.quantile(quantile/100)) /
                     time_period_sec)

        output["components"] = components

        instances: DefaultDict[str, DefaultDict[str, SUMMARY_DICT]] = \
            defaultdict(lambda: defaultdict(dict))

        for (task_id, stream), task_data in \
                spout_emit_counts.groupby(["task", "stream"]):

            LOG.debug("Processing instance: %d stream: %s", task_id, stream)

            instances["mean"][str(task_id)][stream] = \
                (float(task_data.emit_count.mean()) / time_period_sec)

            instances["median"][str(task_id)][stream] = \
                (float(task_data.emit_count.median()) / time_period_sec)

            instances["max"][str(task_id)][stream] = \
                (float(task_data.emit_count.max()) / time_period_sec)

            instances["min"][str(task_id)][stream] = \
                (float(task_data.emit_count.min()) / time_period_sec)

            for quantile in self.quantiles:
                instances[f"{quantile}-quantile"][str(task_id)][stream] = \
                    (float(task_data.emit_count.quantile(quantile/100)) /
                     time_period_sec)

        output["instances"] = instances

        return output