in metrics/heron/tmaster/client.py [0:0]
def get_emit_counts(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> pd.DataFrame:
""" Gets the emit counts, as a timeseries, for every instance of each
of the components of the specified topology. The start and end times
define the window over which to gather the metrics. The window duration
should be less than 3 hours as this is the limit of what the Topology
master stores.
Arguments:
topology_id (str): The topology identification string.
start (datetime): utc datetime instance for the start of the
metrics gathering period.
end (datetime): utc datetime instance for the end of the
metrics gathering period.
**cluster (str): The cluster the topology is running in.
**environ (str): The environment the topology is running in (eg.
prod, devel, test, etc).
Returns:
pandas.DataFrame: A DataFrame containing the emit count
measurements as a timeseries. Each row represents a measurement
(aggregated over one minute) with the following columns:
* timestamp: The UTC timestamp for the metric,
* component: The component this metric comes from,
* task: The instance ID number for the instance that the metric
comes from,
* container: The ID for the container this metric comes from,
* stream: The name of the outing stream from which the tuples that
lead to this metric came from,
* emit_count: The emit count during the metric time period.
"""
LOG.info("Getting emit counts for topology %s over a %d second "
"period from %s to %s", topology_id,
(end-start).total_seconds(), start.isoformat(),
end.isoformat())
logical_plan, start_time, end_time = self._query_setup(
topology_id, cluster, environ, start, end)
output: pd.DataFrame = None
components: List[str] = (list(logical_plan["spouts"].keys()) +
list(logical_plan["bolts"].keys()))
for component in components:
try:
comp_emit_counts: pd.DataFrame = \
self.get_component_emission_counts(
topology_id, cluster, environ, component,
start_time, end_time, logical_plan)
except HTTPError as http_error:
LOG.warning("Fetching emit counts for component %s failed with"
" status code %s", component,
str(http_error.response.status_code))
if output is None:
output = comp_emit_counts
else:
output = output.append(comp_emit_counts, ignore_index=True)
return output