in metrics/heron/influxdb/client.py [0:0]
def get_complete_latencies(self, topology_id: str, cluster: str,
environ: str, start: dt.datetime,
end: dt.datetime,
**kwargs: Union[str, int, float]
) -> pd.DataFrame:
""" Gets the complete latencies, as a timeseries, for every instance of
the of all the spout components of the specified topology. The start
and end times define the window over which to gather the metrics.
Arguments:
topology_id (str): The topology identification string.
cluster (str): The cluster the topology is running in.
environ (str): The environment the topology is running in (eg.
prod, devel, test, etc).
start (datetime): utc datetime instance for the start of the
metrics gathering period.
end (datetime): utc datetime instance for the end of the
metrics gathering period.
Returns:
pandas.DataFrame: A DataFrame containing the service time
measurements as a timeseries. Each row represents a measurement
with the following columns:
* timestamp: The UTC timestamp for the metric,
* component: The component this metric comes from,
* task: The instance ID number for the instance that the metric
comes from,
* container: The ID for the container this metric comes from,
stream: The name of the incoming stream from which the tuples
that lead to this metric came from,
* latency_ms: The average execute latency measurement in
milliseconds for that metric time period.
Raises:
RuntimeWarning: If the specified topology has a reliability mode
that does not enable complete latency.
ConnectionError: If the physical plan cannot be extracted from the
Heron Tracker API.
"""
# First we need to check that the supplied topology will actually have
# complete latencies. Only ATLEAST_ONCE and EXACTLY_ONCE will have
# complete latency values as acking is disabled for ATMOST_ONCE.
try:
physical_plan: Dict[str, Any] = tracker.get_physical_plan(
self.tracker_url, cluster, environ, topology_id)
except ConnectionError as conn_err:
conn_msg: str = (f"Unable to connect to Heron Tracker API at: "
f"{self.tracker_url}. Cannot retrieve physical "
f"plan for topology: {topology_id}")
LOG.error(conn_msg)
raise ConnectionError(conn_msg)
if (physical_plan["config"]
["topology.reliability.mode"] == "ATMOST_ONCE"):
rm_msg: str = (f"Topology {topology_id} reliability mode is set "
f"to ATMOST_ONCE. Complete latency is not "
f"available for these types of topologies")
LOG.warning(rm_msg)
warnings.warn(rm_msg, RuntimeWarning)
return pd.DataFrame()
start_time: str = convert_datetime_to_rfc3339(start)
end_time: str = convert_datetime_to_rfc3339(end)
database: str = create_db_name(self.database_prefix, topology_id,
cluster, environ)
LOG.info("Fetching complete latencies for topology: %s on cluster: %s "
"in environment: %s for a %s second time period between %s "
"and %s", topology_id, cluster, environ,
(end-start).total_seconds(), start_time, end_time)
self.client.switch_database(database)
metric_name: str = "complete-latency"
metric_regex: str = "/complete\-latency\/+.*/"
measurement_names: List[str] = self.get_metric_measurement_names(
database, metric_name, metric_regex)
output: List[Dict[str, Union[str, int, dt.datetime]]] = []
for measurement_name in measurement_names:
_, stream = measurement_name.split("/")
query_str: str = (f"SELECT Component, Instance, value "
f"FROM \"{measurement_name}\" "
f"WHERE time >= '{start_time}' "
f"AND time <= '{end_time}'")
LOG.debug("Querying %s measurements with influx QL statement: %s",
metric_name, query_str)
results: ResultSet = self.client.query(query_str)
for point in results.get_points():
instance: Optional[re.Match] = re.search(
INSTANCE_NAME_RE, point["Instance"])
if instance:
instance_dict: Dict[str, str] = instance.groupdict()
else:
LOG.warning("Could not parse instance name: %s",
point["Instance"])
continue
row: Dict[str, Union[str, int, dt.datetime]] = {
"timestamp": convert_rfc339_to_datetime(point["time"]),
"component": point["Component"],
"task": int(instance_dict["task"]),
"container": int(instance_dict["container"]),
"stream": stream,
"latency_ms": float(point["value"])}
output.append(row)
return pd.DataFrame(output)