def predict_current_performance()

in model/topology/heron/queueing_theory.py [0:0]


    def predict_current_performance(
            self, topology_id: str, cluster: str, environ: str,
            spout_traffic: Dict[int, Dict[str, float]],
            **kwargs: Any) -> pd.DataFrame:
        """
        Arguments:
            topology_id (str): The topology identification string
            spout_traffic (dict):   The expected output of the spout instances.
                                    These emit values should be in tuples per
                                    second (tps) otherwise they will not match
                                    with the service time measurements.
        """
        # TODO: check spout traffic keys are integers!
        start, end = get_start_end_times(**kwargs)

        metric_bucket_length: int = cast(int,
                                         self.config["metric.bucket.length"])

        LOG.info("Predicting traffic levels and backpressure of currently running "
                 "topology %s using queueing theory model", topology_id)

        # Remove the start and end time kwargs so we don't supply them twice to
        # the metrics client.
        # TODO: We need to make this cleaner? Add start and end to topo model?
        other_kwargs: Dict[str, Any] = {key: value
                                        for key, value in kwargs.items()
                                        if key not in ["start", "end"]}

        # Get the service time for all elements
        service_times: pd.DataFrame = self.metrics_client.get_service_times(
            topology_id, cluster, environ, start, end, **other_kwargs)
        if service_times.empty:
            raise Exception("Metric client returned empty data frame for service times.")

        # Calculate the service rate for each instance
        service_times["tuples_per_sec"] = 1.0 / (service_times["latency_ms"] /
                                                 1000.0)

        # Drop the system streams
        service_times = (service_times[~service_times["stream"]
                         .str.contains("__")])

        # Calculate the median service time and rate
        service_time_summary: pd.DataFrame = \
            (service_times[["task", "stream", "latency_ms", "tuples_per_sec"]]
             .groupby(["task", "stream"]).median().reset_index())

        # Get the reference of the latest physical graph entry for this
        # topology, or create a physical graph if there are non.
        topology_ref: str = graph_check(self.graph_client, self.config,
                                        self.tracker_url, cluster, environ,
                                        topology_id)

        # Predict the arrival rate at all instances with the supplied spout
        # traffic
        in_ars, strmgr_ars = self.predict_arrival_rates(
            topology_id, cluster, environ, spout_traffic, start, end,
            metric_bucket_length, topology_ref)

        combined: pd.DataFrame = service_time_summary.merge(
            in_ars, on=["task", "stream"])

        combined["capacity"] = (combined["arrival_rate"] /
                                combined["tuples_per_sec"]) * 100.0

        combined["back_pressure"] = combined["capacity"] > 100.0

        return combined