def get_emit_counts()

in metrics/heron/influxdb/client.py [0:0]


    def get_emit_counts(self, topology_id: str, cluster: str, environ: str,
                        start: dt.datetime, end: dt.datetime,
                        **kwargs: Union[str, int, float]) -> pd.DataFrame:
        """ Gets a time series of the emit count of each of the instances in
        the specified topology.

        Arguments:
            topology (str): The topology ID string.
            cluster (str):  The cluster name.
            environ (str):  The environment that the topology is running in.
            start (datetime.datetime):  UTC datetime instance for the start of
                                        the metrics gathering period.
            end (datetime.datetime):    UTC datetime instance for the end of
                                        the metrics gathering period.

        Returns:
            pandas.DataFrame:   A DataFrame containing the emit count
            measurements as a timeseries. Each row represents a measurement
            with the following columns:

            * timestamp: The UTC timestamp for the metric,
            * component: The component this metric comes from,
            * task: The instance ID number for the instance that the metric
              comes from,
            * container: The ID for the container this metric comes from,
            * stream: The name of the outgoing stream from which the tuples
              that lead to this metric came from,
            * emit_count: The emit count during the metric time period.
        """

        start_time: str = convert_datetime_to_rfc3339(start)
        end_time: str = convert_datetime_to_rfc3339(end)

        database: str = create_db_name(self.database_prefix, topology_id,
                                       cluster, environ)

        LOG.info("Fetching emit counts for topology: %s on cluster: %s in "
                 "environment: %s for a %s second time period between %s and "
                 "%s", topology_id, cluster, environ,
                 (end-start).total_seconds(), start_time, end_time)

        self.client.switch_database(database)

        metric_name: str = "emit-count"
        metric_regex: str = "/emit\-count\/+.*/"

        measurement_names: List[str] = self.get_metric_measurement_names(
            database, metric_name, metric_regex)

        output: List[Dict[str, Union[str, int, dt.datetime]]] = []

        for measurement_name in measurement_names:

            _, stream = measurement_name.split("/")

            query_str: str = (f"SELECT Component, Instance, value "
                              f"FROM \"{measurement_name}\" "
                              f"WHERE time >= '{start_time}' "
                              f"AND time <= '{end_time}'")

            LOG.debug("Querying %s measurements with influx QL statement: %s",
                      metric_name, query_str)

            results: ResultSet = self.client.query(query_str)

            for point in results.get_points():

                instance: Optional[re.Match] = re.search(
                    INSTANCE_NAME_RE, point["Instance"])

                if instance:
                    instance_dict: Dict[str, str] = instance.groupdict()
                else:
                    LOG.warning("Could not parse instance name: %s",
                                point["Instance"])
                    continue

                row: Dict[str, Union[str, int, dt.datetime]] = {
                    "timestamp": convert_rfc339_to_datetime(point["time"]),
                    "component": point["Component"],
                    "task": int(instance_dict["task"]),
                    "container": int(instance_dict["container"]),
                    "stream": stream,
                    "emit_count": int(point["value"])}

                output.append(row)

        return pd.DataFrame(output)