in metrics/heron/tmaster/client.py [0:0]
def get_component_emission_counts(self, topology_id: str, cluster: str,
environ: str, component_name: str,
start: int, end: int,
logical_plan: Dict[str, Any] = None
) -> pd.DataFrame:
""" Gets the emit counts, as a timeseries, for every instance of the
specified component of the specified topology. The start and end times
define the window over which to gather the metrics. The window duration
should be less then 3 hours as this is the limit of what the Topology
master stores.
Arguments:
topology_id (str): The topology identification string.
cluster (str): The cluster the topology is running in.
environ (str): The environment the topology is running in (eg.
prod, devel, test, etc).
component_name (str): The name of the component whose metrics are
required.
start (int): Start time for the time period the query is run
against. This should be a UTC POSIX time integer
(seconds since epoch).
end (int): End time for the time period the query is run against.
This should be a UTC POSIX time integer (seconds since
epoch).
logical_plan (dict): Optional dictionary logical plan returned
by the Heron Tracker API. If not supplied
this method will call the API to get the
logical plan.
Returns:
pandas.DataFrame: A DataFrame containing the emit count
measurements as a timeseries. Each row represents a measurement
(aggregated over one minute) with the following columns:
* timestamp:The UTC timestamp for the metric time period,
* component: The component this metric comes from,
* task: The instance ID number for the instance that the metric
comes from,
* container: The ID for the container this metric comes from,
* stream: The name of the incoming stream from which the tuples
that lead to this metric came from,
* emit_count: The emit count in that metric time period.
"""
LOG.info("Getting emit count metrics for component %s of topology "
"%s", component_name, topology_id)
if not logical_plan:
LOG.debug("Logical plan not supplied, fetching from Heron Tracker")
logical_plan = tracker.get_logical_plan(self.tracker_url, cluster,
environ, topology_id)
outgoing_streams: List[str] = tracker.get_outgoing_streams(
logical_plan, component_name)
metrics: List[str] = ["__emit-count/" + stream
for stream in outgoing_streams]
results: Dict[str, Any] = tracker.get_metrics_timeline(
self.tracker_url, cluster, environ, topology_id, component_name,
start, end, metrics)
output: pd.DataFrame = None
for stream_metric, instance_timelines in results["timeline"].items():
outgoing_stream: str = stream_metric.split("/")[-1]
instance_tls_df: pd.DataFrame = instance_timelines_to_dataframe(
instance_timelines, outgoing_stream, "emit_count",
lambda m: int(float(m)))
if output is None:
output = instance_tls_df
else:
output = output.append(instance_tls_df, ignore_index=True)
return output