metrics/heron/influxdb/client.py (322 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains classes and methods for extracting metrics from an InfluxDB server""" import re import logging import warnings import datetime as dt from typing import Union, List, DefaultDict, Dict, Optional, Any from functools import lru_cache from collections import defaultdict import pandas as pd from influxdb import InfluxDBClient from influxdb.resultset import ResultSet from caladrius.common.heron import tracker from caladrius.metrics.heron.client import HeronMetricsClient LOG: logging.Logger = logging.getLogger(__name__) INFLUX_TIME_FORMAT: str = "%Y-%m-%dT%H:%M:%S.%fZ" INSTANCE_NAME_RE_STR: str = r"container_(?P<container>\d+)_.*_(?P<task>\d+)" INSTANCE_NAME_RE: re.Pattern = re.compile(INSTANCE_NAME_RE_STR) @lru_cache(maxsize=128, typed=False) def create_db_name( prefix: str, topology: str, cluster: str, environ: str) -> str: """ Function for forming the InfluxDB database name from the supplied details. This function will cache results to speed up name creation. Arguments: prefix (str): The string prepended to all database names for this Heron setup. topology (str): The topology ID string. cluster (str): The cluster name. environ (str): The environment that the topology is running in. Returns: The database name string formed form the supplied details. """ return f"{prefix}-{cluster}-{environ}-{topology}" def convert_rfc339_to_datetime(time_str: str) -> dt.datetime: """ Converts an InfluxDB RFC3339 timestamp string into a naive Python datetime object. Arguments: time_str (str): An RFC3339 format timestamp. Returns: datetime.datetime: A naive datetime object. """ return dt.datetime.strptime(time_str, INFLUX_TIME_FORMAT) def convert_datetime_to_rfc3339(dt_obj: dt.datetime) -> str: """ Converts a Python datetime object into an InfluxDB RFC3339 timestamp string. The datetime object is assumed to be naive (no timezone). Arguments: dt_obj (datetime.datetime): A naive datetime object. Returns: str: An RFC3339 format timestamp. """ return dt_obj.strftime(INFLUX_TIME_FORMAT) class HeronInfluxDBClient(HeronMetricsClient): """ Class for extracting Heron metrics from a InfluxDB server """ def __init__(self, config: dict) -> None: super().__init__(config) try: self.host: str = config["influx.host"] self.port: int = int(config["influx.port"]) self.database_prefix: str = config["influx.database.prefix"] self.tracker_url: str = config["heron.tracker.url"] except KeyError as kerr: msg: str = f"Required configuration keys were missing: {kerr.args}" LOG.error(msg) raise KeyError(msg) else: self.username: Optional[str] = None self.password: Optional[str] = None if ("influx.user" in config) and ("influx.password" in config): self.username = config["influx.user"] self.password = config["influx.password"] LOG.info("Creating InfluxDB client for user: %s, on host: %s", config["influx.user"], self.host) self.client: InfluxDBClient = InfluxDBClient( host=self.host, port=self.port, username=self.username, password=self.password) elif "influx.user" in config and "influx.password" not in config: pw_msg: str = (f"Password for InfluxDB user: " f"{config['influx.user']} was not provided") LOG.error(pw_msg) raise KeyError(pw_msg) elif not ("influx.user" in config) and ("influx.password" in config): user_msg: str = "InfluxDB user information was not provided" LOG.error(user_msg) raise KeyError(user_msg) else: LOG.info("Creating InfluxDB client for sever on host: %s", self.host) self.client: InfluxDBClient = InfluxDBClient(host=self.host, port=self.port) self.metric_name_cache: DefaultDict[str, DefaultDict[str, List[str]]] = \ defaultdict(lambda: defaultdict(list)) def __hash__(self) -> int: if self.username and self.password: return hash(self.host + str(self.port) + self.database_prefix + self.username + self.password) return hash(self.host + str(self.port) + self.database_prefix) def __eq__(self, other: object) -> bool: if not isinstance(other, HeronInfluxDBClient): return False if other.username and other.password: other_hash: int = hash(other.host + str(other.port) + other.database_prefix + other.username + other.password) else: other_hash = hash(other.host + str(other.port) + other.database_prefix) if self.__hash__() == other_hash: return True return False @lru_cache(maxsize=128, typed=False) def get_all_measurement_names(self, topology_id: str, cluster: str, environ: str) -> List[str]: """ Gets a list of the measurement names present in the configured InfluxDB database for the topology with the supplied credentials. Arguments: topology (str): The topology ID string. cluster (str): The cluster name. environ (str): The environment that the topology is running in. Returns: List[str]: A list of measurement name strings. """ self.client.switch_database(create_db_name(self.database_prefix, topology_id, cluster, environ)) return [measurement.get("name") for measurement in self.client.get_list_measurements()] def get_metric_measurement_names( self, database: str, metric_name: str, metric_regex: str, force: bool=False) -> List[str]: """ Gets a list of measurement names from the supplied database that are related to the supplied metric using the supplied regex string. Metric name search as cached and so repeated calls will not query the database unless the force flag is used. Arguments: database (str): The name of the influx database to be searched. metric_name (str): The name of the metric whose measurement names are required (this is used for cache keys and logging). metric_regex (str): The regex to be used to search for measurement names. force (bool): Flag indicating if the cache should be bypassed. Defaults to false. Returns: List[str]: A list of measurement name strings from the specified database. Raises: RuntimeError: If the specified database has no measurements matching the supplied metric regex. """ # Check to see if we have already queried Influx for the metric # measurement names, if not query them and cache the results. if database not in self.metric_name_cache[metric_name] or force: LOG.info("Finding measurement names for metric: %s from " "database: %s", metric_name, database) # Find all the measurements for each bolt component measurement_query: str = (f"SHOW MEASUREMENTS ON \"{database}\" " f"WITH MEASUREMENT =~ {metric_regex}") measurement_names: List[str] = \ [point["name"] for point in self.client.query(measurement_query).get_points()] if not measurement_names: msg: str = (f"No measurements found in database: {database} " f"for metric: {metric_name}") LOG.error(msg) raise RuntimeError(msg) else: LOG.info("Found %d measurement names for metric: %s", len(measurement_names), metric_name) self.metric_name_cache[metric_name][database] = \ measurement_names else: LOG.info("Using cached measurement names for metric: %s from " "database: %s", metric_name, database) return self.metric_name_cache[metric_name][database] def get_service_times(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets a time series of the service times of each of the bolt instances in the specified topology Arguments: topology (str): The topology ID string. cluster (str): The cluster name. environ (str): The environment that the topology is running in. start (datetime.datetime): UTC datetime instance for the start of the metrics gathering period. end (datetime.datetime): UTC datetime instance for the end of the metrics gathering period. Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from. * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * source_component: The name of the component the stream's source instance belongs to, * execute_latency: The average execute latency during the metric sample period. """ start_time: str = convert_datetime_to_rfc3339(start) end_time: str = convert_datetime_to_rfc3339(end) database: str = create_db_name(self.database_prefix, topology_id, cluster, environ) LOG.info("Fetching service times for topology: %s on cluster: %s in " "environment: %s for a %s second time period between %s and " "%s", topology_id, cluster, environ, (end-start).total_seconds(), start_time, end_time) self.client.switch_database(database) metric_name: str = "execute-latency" metric_regex: str = "/execute\-latency\/+.*\/+.*/" measurement_names: List[str] = self.get_metric_measurement_names( database, metric_name, metric_regex) output: List[Dict[str, Union[str, int, dt.datetime]]] = [] for measurement_name in measurement_names: _, source_component, stream = measurement_name.split("/") query_str: str = (f"SELECT Component, Instance, value " f"FROM \"{measurement_name}\" " f"WHERE time >= '{start_time}' " f"AND time <= '{end_time}'") LOG.debug("Querying %s measurements with influx QL statement: %s", metric_name, query_str) results: ResultSet = self.client.query(query_str) for point in results.get_points(): instance: Optional[re.Match] = re.search( INSTANCE_NAME_RE, point["Instance"]) if instance: instance_dict: Dict[str, str] = instance.groupdict() else: LOG.warning("Could not parse instance name: %s", point["Instance"]) continue row: Dict[str, Union[str, int, dt.datetime]] = { "time": convert_rfc339_to_datetime(point["time"]), "component": point["Component"], "task": int(instance_dict["task"]), "container": int(instance_dict["container"]), "stream": stream, "source_component": source_component, "execute_latency": float(point["value"])} output.append(row) return pd.DataFrame(output) def get_emit_counts(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets a time series of the emit count of each of the instances in the specified topology. Arguments: topology (str): The topology ID string. cluster (str): The cluster name. environ (str): The environment that the topology is running in. start (datetime.datetime): UTC datetime instance for the start of the metrics gathering period. end (datetime.datetime): UTC datetime instance for the end of the metrics gathering period. Returns: pandas.DataFrame: A DataFrame containing the emit count measurements as a timeseries. Each row represents a measurement with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * stream: The name of the outgoing stream from which the tuples that lead to this metric came from, * emit_count: The emit count during the metric time period. """ start_time: str = convert_datetime_to_rfc3339(start) end_time: str = convert_datetime_to_rfc3339(end) database: str = create_db_name(self.database_prefix, topology_id, cluster, environ) LOG.info("Fetching emit counts for topology: %s on cluster: %s in " "environment: %s for a %s second time period between %s and " "%s", topology_id, cluster, environ, (end-start).total_seconds(), start_time, end_time) self.client.switch_database(database) metric_name: str = "emit-count" metric_regex: str = "/emit\-count\/+.*/" measurement_names: List[str] = self.get_metric_measurement_names( database, metric_name, metric_regex) output: List[Dict[str, Union[str, int, dt.datetime]]] = [] for measurement_name in measurement_names: _, stream = measurement_name.split("/") query_str: str = (f"SELECT Component, Instance, value " f"FROM \"{measurement_name}\" " f"WHERE time >= '{start_time}' " f"AND time <= '{end_time}'") LOG.debug("Querying %s measurements with influx QL statement: %s", metric_name, query_str) results: ResultSet = self.client.query(query_str) for point in results.get_points(): instance: Optional[re.Match] = re.search( INSTANCE_NAME_RE, point["Instance"]) if instance: instance_dict: Dict[str, str] = instance.groupdict() else: LOG.warning("Could not parse instance name: %s", point["Instance"]) continue row: Dict[str, Union[str, int, dt.datetime]] = { "timestamp": convert_rfc339_to_datetime(point["time"]), "component": point["Component"], "task": int(instance_dict["task"]), "container": int(instance_dict["container"]), "stream": stream, "emit_count": int(point["value"])} output.append(row) return pd.DataFrame(output) def get_execute_counts(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets a time series of the service times of each of the bolt instances in the specified topology Arguments: topology (str): The topology ID string. cluster (str): The cluster name. environ (str): The environment that the topology is running in. start (datetime.datetime): UTC datetime instance for the start of the metrics gathering period. end (datetime.datetime): UTC datetime instance for the end of the metrics gathering period. Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from. * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * source_component: The name of the component the stream's source instance belongs to, * execute_count: The execute count during the metric time period. """ start_time: str = convert_datetime_to_rfc3339(start) end_time: str = convert_datetime_to_rfc3339(end) database: str = create_db_name(self.database_prefix, topology_id, cluster, environ) LOG.info("Fetching execute counts for topology: %s on cluster: %s in " "environment: %s for a %s second time period between %s and " "%s", topology_id, cluster, environ, (end-start).total_seconds(), start_time, end_time) self.client.switch_database(database) metric_name: str = "execute-count" metric_regex: str = "/execute\-count\/+.*\/+.*/" measurement_names: List[str] = self.get_metric_measurement_names( database, metric_name, metric_regex) output: List[Dict[str, Union[str, int, dt.datetime]]] = [] for measurement_name in measurement_names: _, source_component, stream = measurement_name.split("/") query_str: str = (f"SELECT Component, Instance, value " f"FROM \"{measurement_name}\" " f"WHERE time >= '{start_time}' " f"AND time <= '{end_time}'") LOG.debug("Querying %s measurements with influx QL statement: %s", metric_name, query_str) results: ResultSet = self.client.query(query_str) for point in results.get_points(): instance: Optional[re.Match] = re.search( INSTANCE_NAME_RE, point["Instance"]) if instance: instance_dict: Dict[str, str] = instance.groupdict() else: LOG.warning("Could not parse instance name: %s", point["Instance"]) continue row: Dict[str, Union[str, int, dt.datetime]] = { "timestamp": convert_rfc339_to_datetime(point["time"]), "component": point["Component"], "task": int(instance_dict["task"]), "container": int(instance_dict["container"]), "stream": stream, "source_component": source_component, "execute_count": int(point["value"])} output.append(row) return pd.DataFrame(output) def get_complete_latencies(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float] ) -> pd.DataFrame: """ Gets the complete latencies, as a timeseries, for every instance of the of all the spout components of the specified topology. The start and end times define the window over which to gather the metrics. Arguments: topology_id (str): The topology identification string. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). start (datetime): utc datetime instance for the start of the metrics gathering period. end (datetime): utc datetime instance for the end of the metrics gathering period. Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, stream: The name of the incoming stream from which the tuples that lead to this metric came from, * latency_ms: The average execute latency measurement in milliseconds for that metric time period. Raises: RuntimeWarning: If the specified topology has a reliability mode that does not enable complete latency. ConnectionError: If the physical plan cannot be extracted from the Heron Tracker API. """ # First we need to check that the supplied topology will actually have # complete latencies. Only ATLEAST_ONCE and EXACTLY_ONCE will have # complete latency values as acking is disabled for ATMOST_ONCE. try: physical_plan: Dict[str, Any] = tracker.get_physical_plan( self.tracker_url, cluster, environ, topology_id) except ConnectionError as conn_err: conn_msg: str = (f"Unable to connect to Heron Tracker API at: " f"{self.tracker_url}. Cannot retrieve physical " f"plan for topology: {topology_id}") LOG.error(conn_msg) raise ConnectionError(conn_msg) if (physical_plan["config"] ["topology.reliability.mode"] == "ATMOST_ONCE"): rm_msg: str = (f"Topology {topology_id} reliability mode is set " f"to ATMOST_ONCE. Complete latency is not " f"available for these types of topologies") LOG.warning(rm_msg) warnings.warn(rm_msg, RuntimeWarning) return pd.DataFrame() start_time: str = convert_datetime_to_rfc3339(start) end_time: str = convert_datetime_to_rfc3339(end) database: str = create_db_name(self.database_prefix, topology_id, cluster, environ) LOG.info("Fetching complete latencies for topology: %s on cluster: %s " "in environment: %s for a %s second time period between %s " "and %s", topology_id, cluster, environ, (end-start).total_seconds(), start_time, end_time) self.client.switch_database(database) metric_name: str = "complete-latency" metric_regex: str = "/complete\-latency\/+.*/" measurement_names: List[str] = self.get_metric_measurement_names( database, metric_name, metric_regex) output: List[Dict[str, Union[str, int, dt.datetime]]] = [] for measurement_name in measurement_names: _, stream = measurement_name.split("/") query_str: str = (f"SELECT Component, Instance, value " f"FROM \"{measurement_name}\" " f"WHERE time >= '{start_time}' " f"AND time <= '{end_time}'") LOG.debug("Querying %s measurements with influx QL statement: %s", metric_name, query_str) results: ResultSet = self.client.query(query_str) for point in results.get_points(): instance: Optional[re.Match] = re.search( INSTANCE_NAME_RE, point["Instance"]) if instance: instance_dict: Dict[str, str] = instance.groupdict() else: LOG.warning("Could not parse instance name: %s", point["Instance"]) continue row: Dict[str, Union[str, int, dt.datetime]] = { "timestamp": convert_rfc339_to_datetime(point["time"]), "component": point["Component"], "task": int(instance_dict["task"]), "container": int(instance_dict["container"]), "stream": stream, "latency_ms": float(point["value"])} output.append(row) return pd.DataFrame(output) def get_arrival_rates(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets a time series of the arrival rates, in units of tuples per second, for each of the instances in the specified topology""" pass def get_receive_counts(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets a time series of the receive counts of each of the bolt instances in the specified topology""" msg: str = ("The custom Caladrius receive-count metrics is not yet " "available via the Influx metrics database") LOG.error(msg) raise NotImplementedError(msg)