in metrics/heron/tmaster/client.py [0:0]
def get_complete_latencies(self, topology_id: str, cluster: str,
environ: str, start: dt.datetime,
end: dt.datetime,
**kwargs: Union[str, int, float]
) -> pd.DataFrame:
""" Gets the complete latencies, as a timeseries, for every instance of
the of all the spout components of the specified topology. The start
and end times define the window over which to gather the metrics. The
window duration should be less than 3 hours as this is the limit of
what the Topology master stores.
Arguments:
topology_id (str): The topology identification string.
cluster (str): The cluster the topology is running in.
environ (str): The environment the topology is running in (eg.
prod, devel, test, etc).
start (datetime): utc datetime instance for the start of the
metrics gathering period.
end (datetime): utc datetime instance for the end of the
metrics gathering period.
Returns:
pandas.DataFrame: A DataFrame containing the service time
measurements as a timeseries. Each row represents a measurement
(aggregated over one minute) with the following columns:
* timestamp: The UTC timestamp for the metric,
* component: The component this metric comes from,
* task: The instance ID number for the instance that the metric
comes from,
* container: The ID for the container this metric comes from,
stream: The name of the incoming stream from which the tuples
that lead to this metric came from,
* latency_ms: The average execute latency measurement in
milliseconds for that metric time period.
Raises:
RuntimeWarning: If the specified topology has a reliability mode
that does not enable complete latency.
"""
LOG.info("Getting complete latencies for topology %s over a %d second "
"period from %s to %s", topology_id,
(end-start).total_seconds(), start.isoformat(),
end.isoformat())
logical_plan, start_time, end_time = self._query_setup(
topology_id, cluster, environ, start, end)
# First we need to check that the supplied topology will actually have
# complete latencies. Only ATLEAST_ONCE and EXACTLY_ONCE will have
# complete latency values as acking is disabled for ATMOST_ONCE.
physical_plan: Dict[str, Any] = tracker.get_physical_plan(
self.tracker_url, cluster, environ, topology_id)
if (physical_plan["config"]
["topology.reliability.mode"] == "ATMOST_ONCE"):
rm_msg: str = (f"Topology {topology_id} reliability mode is set "
f"to ATMOST_ONCE. Complete latency is not "
f"available for these types of topologies")
LOG.warning(rm_msg)
warnings.warn(rm_msg, RuntimeWarning)
return pd.DataFrame()
output: pd.DataFrame = None
spouts: Dict[str, Any] = logical_plan["spouts"]
for spout_component in spouts:
try:
spout_complete_latencies: pd.DataFrame = \
self.get_spout_complete_latencies(topology_id,
cluster, environ,
spout_component,
start_time, end_time,
logical_plan)
except HTTPError as http_error:
LOG.warning("Fetching execute latencies for component %s "
"failed with status code %s", spout_component,
str(http_error.response.status_code))
if output is None:
output = spout_complete_latencies
else:
output = output.append(spout_complete_latencies,
ignore_index=True)
return output