metrics/heron/tmaster/client.py (408 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains classes and methods for extracting metrics from the Heron Topology Master instance. """ import logging import warnings import datetime as dt from typing import Dict, List, Any, Callable, Union, Tuple, Optional import pandas as pd from requests.exceptions import HTTPError from caladrius.metrics.heron.client import HeronMetricsClient from caladrius.common.heron import tracker from caladrius.config.keys import ConfKeys LOG: logging.Logger = logging.getLogger(__name__) # pylint: disable=too-many-locals, too-many-arguments # Type definitions ROW_DICT = Dict[str, Union[str, int, float, dt.datetime, None]] # The TMaster metrics are aggregated into minute long periods by default DEFAULT_METRIC_PERIOD: int = 60 def time_check(start: dt.datetime, end: dt.datetime, time_limit_hrs: float) -> None: """ Checks the time period, defined by the supplied start and end points, against the period defined from now back by the supplied time limit in hours. If the time check passes then nothing will be returned. Arguments: start (datetime): The start of the time period. Should be UTC. end (datetime): The end of the time period. Should be UTC. time_limit_hrs (float): The number of hours back from now that define the allowed time period. Raises: RuntimeError: If the supplied time period is not within the defined limit or if the end time is before the start time. RuntimeWarning: If the supplied time period crosses the limits of the metrics storage period. """ if end < start: msg: str = (f"The supplied end time ({end.isoformat}) is before the " f"supplied start time ({start.isoformat}). No data will " f"be returned.") LOG.error(msg) raise RuntimeError(msg) now: dt.datetime = dt.datetime.now(dt.timezone.utc) limit: dt.datetime = now - dt.timedelta(hours=time_limit_hrs) if start < limit and end < limit: limit_msg: str = (f"The defined time period ({start.isoformat()} to " f"{end.isoformat()}) is outside of the " f"{time_limit_hrs} hours of data stored by the " f"Topology Master. No data will be returned.") LOG.error(limit_msg) raise RuntimeError(limit_msg) if start < limit and end > limit: truncated_duration: float = round(((end - limit).total_seconds() / 3600), 2) truncated_msg: str = (f"The start ({start.isoformat()}) of the " f"supplied time window is beyond the " f"{time_limit_hrs} hours stored by the Topology " f"Master. Results will be limited to " f"{truncated_duration} hours from " f"{limit.isoformat()} to {end.isoformat()}") LOG.warning(truncated_msg) warnings.warn(truncated_msg, RuntimeWarning) def instance_timelines_to_dataframe( instance_timelines: dict, stream: Optional[str], measurement_name: str, conversion_func: Callable[[str], Union[str, int, float]] = None, source_component: str = None) -> pd.DataFrame: """ Converts the timeline dictionaries of a *single metric* into a single combined DataFrame for all instances. All timestamps are converted to UTC Python datetime objects and the returned DataFrame (for each instance) is sorted by ascending date. Arguments: instance_timelines (dict): A dictionary of instance metric timelines, where each key is an instance name linking to a dictionary of <timestamp> : <measurement> pairs. stream (str): The stream name that these metrics are related to. measurement_name (str): The name of the measurements being processed. This will be used as the measurement column heading. conversion_func (function): An optional function for converting the measurement in the timeline. If not supplied the measurement will be left as a string. Returns: pandas.DataFrame: A DataFrame containing the timelines of all instances in the supplied dictionary. """ output: List[ROW_DICT] = [] instance_name: str timeline: Dict[str, str] for instance_name, timeline in instance_timelines.items(): details = tracker.parse_instance_name(instance_name) instance_list: List[ROW_DICT] = [] timestamp_str: str measurement_str: str for timestamp_str, measurement_str in timeline.items(): timestamp: dt.datetime = \ dt.datetime.utcfromtimestamp(int(timestamp_str)) if "nan" in measurement_str: measurement: Union[str, int, float, None] = None else: if conversion_func: measurement = conversion_func(measurement_str) else: measurement = measurement_str row: ROW_DICT = { "timestamp": timestamp, "container": details["container"], "task": details["task_id"], "component": details["component"], measurement_name: measurement} if stream: row["stream"] = stream if source_component: row["source_component"] = source_component instance_list.append(row) # Because the original dict returned by the tracker is # unsorted we need to sort the rows by ascending time instance_list.sort( key=lambda instance: instance["timestamp"]) output.extend(instance_list) return pd.DataFrame(output) def str_nano_to_float_milli(nano_str: str) -> float: """ Converts a string of a nano measurement into a millisecond float value. """ return float(nano_str) / 1000000.0 class HeronTMasterClient(HeronMetricsClient): """ Class for extracting metrics from the Heron Topology Master metrics store. """ def __init__(self, config: dict) -> None: super().__init__(config) self.tracker_url = config[ConfKeys.HERON_TRACKER_URL.value] self.time_limit_hrs = \ config.get(ConfKeys.HERON_TMASTER_METRICS_MAX_HOURS.value, 3) LOG.info("Created Topology Master metrics client using Heron Tracker " "at: %s", self.tracker_url) def __hash__(self) -> int: return hash(self.tracker_url) def __eq__(self, other: object) -> bool: if not isinstance(other, HeronTMasterClient): return False if self.tracker_url == other.tracker_url: return True return False def _query_setup(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, ) -> Tuple[Dict[str, Any], int, int]: """ Helper method for setting up each of the query methods with the required variables.""" time_check(start, end, self.time_limit_hrs) start_time: int = int(round(start.timestamp())) end_time: int = int(round(end.timestamp())) logical_plan: Dict[str, Any] = tracker.get_logical_plan( self.tracker_url, cluster, environ, topology_id) return logical_plan, start_time, end_time def get_component_service_times(self, topology_id: str, cluster: str, environ: str, component_name: str, start: int, end: int, logical_plan: Dict[str, Any]=None) -> pd.DataFrame: """ Gets the service times, as a timeseries, for every instance of the specified component of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less then 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). component_name (str): The name of the component whose metrics are required. start (int): Start time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). end (int): End time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). logical_plan (dict): Optional dictionary logical plan returned by the Heron Tracker API. If not supplied this method will call the API to get the logical plan. Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp: The UTC timestamp for the metric time period, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * execute-latency-ms: The average execute latency measurement in milliseconds for that metric time period. """ LOG.info("Getting service time metrics for component %s of topology " "%s", component_name, topology_id) if not logical_plan: LOG.debug("Logical plan not supplied, fetching from Heron Tracker") logical_plan = tracker.get_logical_plan(self.tracker_url, cluster, environ, topology_id) incoming_streams: List[Tuple[str, str]] = \ tracker.incoming_sources_and_streams(logical_plan, component_name) metrics: List[str] = ["__execute-latency/" + source + "/" + stream for source, stream in incoming_streams] results: Dict[str, Any] = tracker.get_metrics_timeline( self.tracker_url, cluster, environ, topology_id, component_name, start, end, metrics) output: pd.DataFrame = None for stream_metric, instance_timelines in results["timeline"].items(): metric_list: List[str] = stream_metric.split("/") incoming_source: str = metric_list[1] incoming_stream: str = metric_list[2] instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe( instance_timelines, incoming_stream, "latency_ms", str_nano_to_float_milli, incoming_source) if output is None: output = instance_tls_df else: output = output.append(instance_tls_df, ignore_index=True) return output def get_service_times(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets the service times, as a timeseries, for every instance of the of all the bolt components of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less than 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. start (datetime): utc datetime instance for the start of the metrics gathering period. end (datetime): utc datetime instance for the end of the metrics gathering period. **cluster (str): The cluster the topology is running in. **environ (str): The environment the topology is running in (eg. prod, devel, test, etc). Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp:The UTC timestamp for the metric time period, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * latency_ms: The average execute latency measurement in milliseconds for that metric time period. """ LOG.info("Getting service times for topology %s over a %d second " "period from %s to %s", topology_id, (end-start).total_seconds(), start.isoformat(), end.isoformat()) logical_plan, start_time, end_time = self._query_setup( topology_id, cluster, environ, start, end) output: pd.DataFrame = None bolts: Dict[str, Any] = logical_plan["bolts"] bolt_component: str for bolt_component in bolts: try: bolt_service_times: pd.DataFrame = \ self.get_component_service_times(topology_id, cluster, environ, bolt_component, start_time, end_time, logical_plan) except HTTPError as http_error: LOG.warning("Fetching execute latencies for component %s " "failed with status code %s", bolt_component, str(http_error.response.status_code)) else: if output is None: output = bolt_service_times else: output = output.append(bolt_service_times, ignore_index=True) return output def get_component_emission_counts(self, topology_id: str, cluster: str, environ: str, component_name: str, start: int, end: int, logical_plan: Dict[str, Any] = None ) -> pd.DataFrame: """ Gets the emit counts, as a timeseries, for every instance of the specified component of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less then 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). component_name (str): The name of the component whose metrics are required. start (int): Start time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). end (int): End time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). logical_plan (dict): Optional dictionary logical plan returned by the Heron Tracker API. If not supplied this method will call the API to get the logical plan. Returns: pandas.DataFrame: A DataFrame containing the emit count measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp:The UTC timestamp for the metric time period, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * emit_count: The emit count in that metric time period. """ LOG.info("Getting emit count metrics for component %s of topology " "%s", component_name, topology_id) if not logical_plan: LOG.debug("Logical plan not supplied, fetching from Heron Tracker") logical_plan = tracker.get_logical_plan(self.tracker_url, cluster, environ, topology_id) outgoing_streams: List[str] = tracker.get_outgoing_streams( logical_plan, component_name) metrics: List[str] = ["__emit-count/" + stream for stream in outgoing_streams] results: Dict[str, Any] = tracker.get_metrics_timeline( self.tracker_url, cluster, environ, topology_id, component_name, start, end, metrics) output: pd.DataFrame = None for stream_metric, instance_timelines in results["timeline"].items(): outgoing_stream: str = stream_metric.split("/")[-1] instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe( instance_timelines, outgoing_stream, "emit_count", lambda m: int(float(m))) if output is None: output = instance_tls_df else: output = output.append(instance_tls_df, ignore_index=True) return output def get_emit_counts(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets the emit counts, as a timeseries, for every instance of each of the components of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less than 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. start (datetime): utc datetime instance for the start of the metrics gathering period. end (datetime): utc datetime instance for the end of the metrics gathering period. **cluster (str): The cluster the topology is running in. **environ (str): The environment the topology is running in (eg. prod, devel, test, etc). Returns: pandas.DataFrame: A DataFrame containing the emit count measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * stream: The name of the outing stream from which the tuples that lead to this metric came from, * emit_count: The emit count during the metric time period. """ LOG.info("Getting emit counts for topology %s over a %d second " "period from %s to %s", topology_id, (end-start).total_seconds(), start.isoformat(), end.isoformat()) logical_plan, start_time, end_time = self._query_setup( topology_id, cluster, environ, start, end) output: pd.DataFrame = None components: List[str] = (list(logical_plan["spouts"].keys()) + list(logical_plan["bolts"].keys())) for component in components: try: comp_emit_counts: pd.DataFrame = \ self.get_component_emission_counts( topology_id, cluster, environ, component, start_time, end_time, logical_plan) except HTTPError as http_error: LOG.warning("Fetching emit counts for component %s failed with" " status code %s", component, str(http_error.response.status_code)) if output is None: output = comp_emit_counts else: output = output.append(comp_emit_counts, ignore_index=True) return output def get_component_execute_counts(self, topology_id: str, cluster: str, environ: str, component_name: str, start: int, end: int, logical_plan: Dict[str, Any] = None ) -> pd.DataFrame: """ Gets the execute counts, as a timeseries, for every instance of the specified component of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less then 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). component_name (str): The name of the component whose metrics are required. start (int): Start time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). end (int): End time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). logical_plan (dict): Optional dictionary logical plan returned by the Heron Tracker API. If not supplied this method will call the API to get the logical plan. Returns: pandas.DataFrame: A DataFrame containing the emit count measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp: The UTC timestamp for the metric time period, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from. * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * execute_count: The execute count in that metric time period. """ LOG.info("Getting execute count metrics for component %s of topology " "%s", component_name, topology_id) if not logical_plan: LOG.debug("Logical plan not supplied, fetching from Heron Tracker") logical_plan = tracker.get_logical_plan(self.tracker_url, cluster, environ, topology_id) incoming_streams: List[Tuple[str, str]] = \ tracker.incoming_sources_and_streams(logical_plan, component_name) metrics: List[str] = ["__execute-count/" + source + "/" + stream for source, stream in incoming_streams] results: Dict[str, Any] = tracker.get_metrics_timeline( self.tracker_url, cluster, environ, topology_id, component_name, start, end, metrics) output: pd.DataFrame = None for stream_metric, instance_timelines in results["timeline"].items(): metric_list: List[str] = stream_metric.split("/") incoming_source: str = metric_list[1] incoming_stream: str = metric_list[2] instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe( instance_timelines, incoming_stream, "execute_count", lambda m: int(float(m)), incoming_source) if output is None: output = instance_tls_df else: output = output.append(instance_tls_df, ignore_index=True) return output def get_execute_counts(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets the execute counts, as a timeseries, for every instance of each of the components of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less than 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. start (datetime): UTC datetime instance for the start of the metrics gathering period. end (datetime): UTC datetime instance for the end of the metrics gathering period. **cluster (str): The cluster the topology is running in. **environ (str): The environment the topology is running in (eg. prod, devel, test, etc). Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from. * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * source_component: The name of the component the stream's source instance belongs to, * execute_count: The execute count during the metric time period. """ LOG.info("Getting execute counts for topology %s over a %d second " "period from %s to %s", topology_id, (end-start).total_seconds(), start.isoformat(), end.isoformat()) logical_plan, start_time, end_time = self._query_setup( topology_id, cluster, environ, start, end) output: pd.DataFrame = None for component in logical_plan["bolts"].keys(): try: comp_execute_counts: pd.DataFrame = \ self.get_component_execute_counts(topology_id, cluster, environ, component, start_time, end_time, logical_plan) except HTTPError as http_error: LOG.warning("Fetching execute counts for component %s failed " "with status code %s", component, str(http_error.response.status_code)) if output is None: output = comp_execute_counts else: output = output.append(comp_execute_counts, ignore_index=True) return output def get_spout_complete_latencies(self, topology_id: str, cluster: str, environ: str, component_name: str, start: int, end: int, logical_plan: Dict[str, Any] = None ) -> pd.DataFrame: """ Gets the complete latency, as a timeseries, for every instance of the specified component of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less then 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). component_name (str): The name of the spout component whose metrics are required. start (int): Start time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). end (int): End time for the time period the query is run against. This should be a UTC POSIX time integer (seconds since epoch). logical_plan (dict): Optional dictionary logical plan returned by the Heron Tracker API. If not supplied this method will call the API to get the logical plan. Returns: pandas.DataFrame: A DataFrame containing the complete latency measurements as a timeseries. Each row represents a measurement (averaged over one minute) with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * stream: The name of the incoming stream from which the tuples that lead to this metric came from, * latency_ms: The average execute latency measurement in milliseconds for that metric time period. """ LOG.info("Getting complete latency metrics for component %s of " "topology %s", component_name, topology_id) if not logical_plan: LOG.debug("Logical plan not supplied, fetching from Heron Tracker") logical_plan = tracker.get_logical_plan(self.tracker_url, cluster, environ, topology_id) outgoing_streams: List[str] = \ tracker.get_outgoing_streams(logical_plan, component_name) metrics: List[str] = ["__complete-latency/" + stream for stream in outgoing_streams] results: Dict[str, Any] = tracker.get_metrics_timeline( self.tracker_url, cluster, environ, topology_id, component_name, start, end, metrics) output: pd.DataFrame = None for stream_metric, instance_timelines in results["timeline"].items(): metric_list: List[str] = stream_metric.split("/") outgoing_stream: str = metric_list[1] instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe( instance_timelines, outgoing_stream, "latency_ms", str_nano_to_float_milli) if output is None: output = instance_tls_df else: output = output.append(instance_tls_df, ignore_index=True) return output def get_complete_latencies(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float] ) -> pd.DataFrame: """ Gets the complete latencies, as a timeseries, for every instance of the of all the spout components of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less than 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). start (datetime): utc datetime instance for the start of the metrics gathering period. end (datetime): utc datetime instance for the end of the metrics gathering period. Returns: pandas.DataFrame: A DataFrame containing the service time measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, stream: The name of the incoming stream from which the tuples that lead to this metric came from, * latency_ms: The average execute latency measurement in milliseconds for that metric time period. Raises: RuntimeWarning: If the specified topology has a reliability mode that does not enable complete latency. """ LOG.info("Getting complete latencies for topology %s over a %d second " "period from %s to %s", topology_id, (end-start).total_seconds(), start.isoformat(), end.isoformat()) logical_plan, start_time, end_time = self._query_setup( topology_id, cluster, environ, start, end) # First we need to check that the supplied topology will actually have # complete latencies. Only ATLEAST_ONCE and EXACTLY_ONCE will have # complete latency values as acking is disabled for ATMOST_ONCE. physical_plan: Dict[str, Any] = tracker.get_physical_plan( self.tracker_url, cluster, environ, topology_id) if (physical_plan["config"] ["topology.reliability.mode"] == "ATMOST_ONCE"): rm_msg: str = (f"Topology {topology_id} reliability mode is set " f"to ATMOST_ONCE. Complete latency is not " f"available for these types of topologies") LOG.warning(rm_msg) warnings.warn(rm_msg, RuntimeWarning) return pd.DataFrame() output: pd.DataFrame = None spouts: Dict[str, Any] = logical_plan["spouts"] for spout_component in spouts: try: spout_complete_latencies: pd.DataFrame = \ self.get_spout_complete_latencies(topology_id, cluster, environ, spout_component, start_time, end_time, logical_plan) except HTTPError as http_error: LOG.warning("Fetching execute latencies for component %s " "failed with status code %s", spout_component, str(http_error.response.status_code)) if output is None: output = spout_complete_latencies else: output = output.append(spout_complete_latencies, ignore_index=True) return output def get_calculated_arrival_rates(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: """ Gets the arrival rates, as a timeseries, for every instance of each of the bolt components of the specified topology. The start and end times define the window over which to gather the metrics. The window duration should be less than 3 hours as this is the limit of what the Topology master stores. Arguments: topology_id (str): The topology identification string. start (datetime): utc datetime instance for the start of the metrics gathering period. end (datetime): utc datetime instance for the end of the metrics gathering period. **cluster (str): The cluster the topology is running in. **environ (str): The environment the topology is running in (eg. prod, devel, test, etc). Returns: pandas.DataFrame: A DataFrame containing the arrival rate measurements as a timeseries. Each row represents a measurement (aggregated over one minute) with the following columns: * timestamp: The UTC timestamp for the metric, * component: The component this metric comes from, * task: The instance ID number for the instance that the metric comes from, * container: The ID for the container this metric comes from, * arrival_count: The number of arrivals (across all streams) at each instance. * arrival_rate_tps: The arrival rate at each instance (across all streams) in units of tuples per second. """ LOG.info("Getting arrival rates for topology %s over a %d second " "period from %s to %s", topology_id, (end-start).total_seconds(), start.isoformat(), end.isoformat()) execute_counts: pd.DataFrame = self.get_execute_counts( topology_id, cluster, environ, start, end) arrivals: pd.DataFrame = \ (execute_counts.groupby(["task", "component", "timestamp"]) .sum().reset_index() .rename(index=str, columns={"execute_count": "arrival_count"})) arrivals["arrival_rate_tps"] = (arrivals["arrival_count"] / DEFAULT_METRIC_PERIOD) return arrivals def get_receive_counts(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, **kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = ("The custom caladrius receive-count metrics is not yet " "available via the TMaster metrics database") LOG.error(msg) raise NotImplementedError(msg) def get_incoming_queue_sizes(self, topology_id: str, cluster: str, environ: str, start: [dt.datetime] = None, end: [dt.datetime] = None, **kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = "Unimplemented" LOG.error(msg) raise NotImplementedError(msg) def get_cpu_load(self, topology_id: str, cluster: str, environ: str, start: [dt.datetime] = None, end: [dt.datetime] = None, ** kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = "Unimplemented" LOG.error(msg) raise NotImplementedError(msg) def get_gc_time(self, topology_id: str, cluster: str, environ: str, start: [dt.datetime] = None, end: [dt.datetime] = None, ** kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = "Unimplemented" LOG.error(msg) raise NotImplementedError(msg) def get_num_packets_received(self, topology_id: str, cluster: str, environ: str, start: [dt.datetime] = None, end: [dt.datetime] = None, ** kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = "Unimplemented" LOG.error(msg) raise NotImplementedError(msg) def get_packet_arrival_rate(self, topology_id: str, cluster: str, environ: str, start: [dt.datetime] = None, end: [dt.datetime] = None, **kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = "Unimplemented" LOG.error(msg) raise NotImplementedError(msg) def get_tuple_arrivals_at_stmgr(self, topology_id: str, cluster: str, environ: str, start: [dt.datetime] = None, end: [dt.datetime] = None, **kwargs: Union[str, int, float]) -> pd.DataFrame: msg: str = "Unimplemented" LOG.error(msg) raise NotImplementedError(msg)