in metrics/heron/tmaster/client.py [0:0]
def get_spout_complete_latencies(self, topology_id: str, cluster: str,
environ: str, component_name: str,
start: int, end: int,
logical_plan: Dict[str, Any] = None
) -> pd.DataFrame:
""" Gets the complete latency, as a timeseries, for every instance of
the specified component of the specified topology. The start and end
times define the window over which to gather the metrics. The window
duration should be less then 3 hours as this is the limit of what the
Topology master stores.
Arguments:
topology_id (str): The topology identification string.
cluster (str): The cluster the topology is running in.
environ (str): The environment the topology is running in (eg.
prod, devel, test, etc).
component_name (str): The name of the spout component whose
metrics are required.
start (int): Start time for the time period the query is run
against. This should be a UTC POSIX time integer
(seconds since epoch).
end (int): End time for the time period the query is run against.
This should be a UTC POSIX time integer (seconds since
epoch).
logical_plan (dict): Optional dictionary logical plan returned
by the Heron Tracker API. If not supplied
this method will call the API to get the
logical plan.
Returns:
pandas.DataFrame: A DataFrame containing the complete latency
measurements as a timeseries. Each row represents a measurement
(averaged over one minute) with the following columns:
* timestamp: The UTC timestamp for the metric,
* component: The component this metric comes from,
* task: The instance ID number for the instance that the metric
comes from,
* container: The ID for the container this metric comes from,
* stream: The name of the incoming stream from which the tuples
that lead to this metric came from,
* latency_ms: The average execute latency measurement in
milliseconds for that metric time period.
"""
LOG.info("Getting complete latency metrics for component %s of "
"topology %s", component_name, topology_id)
if not logical_plan:
LOG.debug("Logical plan not supplied, fetching from Heron Tracker")
logical_plan = tracker.get_logical_plan(self.tracker_url, cluster,
environ, topology_id)
outgoing_streams: List[str] = \
tracker.get_outgoing_streams(logical_plan, component_name)
metrics: List[str] = ["__complete-latency/" + stream
for stream in outgoing_streams]
results: Dict[str, Any] = tracker.get_metrics_timeline(
self.tracker_url, cluster, environ, topology_id, component_name,
start, end, metrics)
output: pd.DataFrame = None
for stream_metric, instance_timelines in results["timeline"].items():
metric_list: List[str] = stream_metric.split("/")
outgoing_stream: str = metric_list[1]
instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe(
instance_timelines, outgoing_stream, "latency_ms",
str_nano_to_float_milli)
if output is None:
output = instance_tls_df
else:
output = output.append(instance_tls_df, ignore_index=True)
return output