in metrics/heron/influxdb/client.py [0:0]
def get_execute_counts(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> pd.DataFrame:
""" Gets a time series of the service times of each of the bolt
instances in the specified topology
Arguments:
topology (str): The topology ID string.
cluster (str): The cluster name.
environ (str): The environment that the topology is running in.
start (datetime.datetime): UTC datetime instance for the start of
the metrics gathering period.
end (datetime.datetime): UTC datetime instance for the end of
the metrics gathering period.
Returns:
pandas.DataFrame: A DataFrame containing the service time
measurements as a timeseries. Each row represents a measurement
with the following columns:
* timestamp: The UTC timestamp for the metric,
* component: The component this metric comes from,
* task: The instance ID number for the instance that the metric
comes from,
* container: The ID for the container this metric comes from.
* stream: The name of the incoming stream from which the tuples
that lead to this metric came from,
* source_component: The name of the component the stream's source
instance belongs to,
* execute_count: The execute count during the metric time period.
"""
start_time: str = convert_datetime_to_rfc3339(start)
end_time: str = convert_datetime_to_rfc3339(end)
database: str = create_db_name(self.database_prefix, topology_id,
cluster, environ)
LOG.info("Fetching execute counts for topology: %s on cluster: %s in "
"environment: %s for a %s second time period between %s and "
"%s", topology_id, cluster, environ,
(end-start).total_seconds(), start_time, end_time)
self.client.switch_database(database)
metric_name: str = "execute-count"
metric_regex: str = "/execute\-count\/+.*\/+.*/"
measurement_names: List[str] = self.get_metric_measurement_names(
database, metric_name, metric_regex)
output: List[Dict[str, Union[str, int, dt.datetime]]] = []
for measurement_name in measurement_names:
_, source_component, stream = measurement_name.split("/")
query_str: str = (f"SELECT Component, Instance, value "
f"FROM \"{measurement_name}\" "
f"WHERE time >= '{start_time}' "
f"AND time <= '{end_time}'")
LOG.debug("Querying %s measurements with influx QL statement: %s",
metric_name, query_str)
results: ResultSet = self.client.query(query_str)
for point in results.get_points():
instance: Optional[re.Match] = re.search(
INSTANCE_NAME_RE, point["Instance"])
if instance:
instance_dict: Dict[str, str] = instance.groupdict()
else:
LOG.warning("Could not parse instance name: %s",
point["Instance"])
continue
row: Dict[str, Union[str, int, dt.datetime]] = {
"timestamp": convert_rfc339_to_datetime(point["time"]),
"component": point["Component"],
"task": int(instance_dict["task"]),
"container": int(instance_dict["container"]),
"stream": stream,
"source_component": source_component,
"execute_count": int(point["value"])}
output.append(row)
return pd.DataFrame(output)