def get_complete_latencies()

in metrics/heron/influxdb/client.py [0:0]


    def get_complete_latencies(self, topology_id: str, cluster: str,
                               environ: str, start: dt.datetime,
                               end: dt.datetime,
                               **kwargs: Union[str, int, float]
                               ) -> pd.DataFrame:
        """ Gets the complete latencies, as a timeseries, for every instance of
        the of all the spout components of the specified topology. The start
        and end times define the window over which to gather the metrics.

        Arguments:
            topology_id (str):    The topology identification string.
            cluster (str):  The cluster the topology is running in.
            environ (str):  The environment the topology is running in (eg.
                              prod, devel, test, etc).
            start (datetime):    utc datetime instance for the start of the
                                    metrics gathering period.
            end (datetime):  utc datetime instance for the end of the
                                metrics gathering period.

        Returns:
            pandas.DataFrame: A DataFrame containing the service time
            measurements as a timeseries. Each row represents a measurement
            with the following columns:

            * timestamp:  The UTC timestamp for the metric,
            * component: The component this metric comes from,
            * task: The instance ID number for the instance that the metric
              comes from,
            * container:  The ID for the container this metric comes from,
              stream: The name of the incoming stream from which the tuples
              that lead to this metric came from,
            * latency_ms: The average execute latency measurement in
              milliseconds for that metric time period.

        Raises:
            RuntimeWarning: If the specified topology has a reliability mode
                            that does not enable complete latency.
            ConnectionError: If the physical plan cannot be extracted from the
                             Heron Tracker API.
        """

        # First we need to check that the supplied topology will actually have
        # complete latencies. Only ATLEAST_ONCE and EXACTLY_ONCE will have
        # complete latency values as acking is disabled for ATMOST_ONCE.
        try:
            physical_plan: Dict[str, Any] = tracker.get_physical_plan(
                self.tracker_url, cluster, environ, topology_id)
        except ConnectionError as conn_err:
            conn_msg: str = (f"Unable to connect to Heron Tracker API at: "
                             f"{self.tracker_url}. Cannot retrieve physical "
                             f"plan for topology: {topology_id}")
            LOG.error(conn_msg)
            raise ConnectionError(conn_msg)

        if (physical_plan["config"]
                ["topology.reliability.mode"] == "ATMOST_ONCE"):
            rm_msg: str = (f"Topology {topology_id} reliability mode is set "
                           f"to ATMOST_ONCE. Complete latency is not "
                           f"available for these types of topologies")
            LOG.warning(rm_msg)
            warnings.warn(rm_msg, RuntimeWarning)
            return pd.DataFrame()

        start_time: str = convert_datetime_to_rfc3339(start)
        end_time: str = convert_datetime_to_rfc3339(end)

        database: str = create_db_name(self.database_prefix, topology_id,
                                       cluster, environ)

        LOG.info("Fetching complete latencies for topology: %s on cluster: %s "
                 "in environment: %s for a %s second time period between %s "
                 "and %s", topology_id, cluster, environ,
                 (end-start).total_seconds(), start_time, end_time)

        self.client.switch_database(database)

        metric_name: str = "complete-latency"
        metric_regex: str = "/complete\-latency\/+.*/"

        measurement_names: List[str] = self.get_metric_measurement_names(
            database, metric_name, metric_regex)

        output: List[Dict[str, Union[str, int, dt.datetime]]] = []

        for measurement_name in measurement_names:

            _, stream = measurement_name.split("/")

            query_str: str = (f"SELECT Component, Instance, value "
                              f"FROM \"{measurement_name}\" "
                              f"WHERE time >= '{start_time}' "
                              f"AND time <= '{end_time}'")

            LOG.debug("Querying %s measurements with influx QL statement: %s",
                      metric_name, query_str)

            results: ResultSet = self.client.query(query_str)

            for point in results.get_points():

                instance: Optional[re.Match] = re.search(
                    INSTANCE_NAME_RE, point["Instance"])

                if instance:
                    instance_dict: Dict[str, str] = instance.groupdict()
                else:
                    LOG.warning("Could not parse instance name: %s",
                                point["Instance"])
                    continue

                row: Dict[str, Union[str, int, dt.datetime]] = {
                    "timestamp": convert_rfc339_to_datetime(point["time"]),
                    "component": point["Component"],
                    "task": int(instance_dict["task"]),
                    "container": int(instance_dict["container"]),
                    "stream": stream,
                    "latency_ms": float(point["value"])}

                output.append(row)

        return pd.DataFrame(output)