common/heron/tracker.py (180 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains methods for extracting and analysing information from the Heron Tracker services REST API: https://twitter.github.io/heron/docs/operators/heron-tracker-api/ """ import logging from typing import List, Dict, Union, Any, Tuple, cast import requests import pandas as pd LOG: logging.Logger = logging.getLogger(__name__) # pylint: disable=too-many-arguments def get_topologies(tracker_url: str, cluster: str = None, environ: str = None) -> pd.DataFrame: """ Gets the details from the Heron Tracker API of all registered topologies. The results can be limited to a specific cluster and environment. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): Optional cluster to limit search results to. environ (str): Optional environment to limit the search to (eg. prod, devel, test, etc). Returns: pandas.DataFrame: A DataFrame containing details of all topologies registered with the Heron tracker. This has the columns for: * topology: The topology ID. * cluster: The cluster the topology is running on. * environ: The environment the topology is running in. * user: The user that uploaded the topology. Raises: requests.HTTPError: If a non 200 status code is returned. """ LOG.info("Fetching list of available topologies") topo_url: str = tracker_url + "/topologies" response: requests.Response = requests.get(topo_url, params={"cluster": cluster, "environ": environ}) try: response.raise_for_status() except requests.HTTPError as err: LOG.error("Request for topology list for cluster: %s, environment: %s " "failed with error code: %s", cluster, environ, str(response.status_code)) raise err results: Dict[str, Any] = response.json()["result"] output: List[Dict[str, str]] = [] for cluster_name, cluster_dict in results.items(): for user, user_dict in cluster_dict.items(): for environment, topology_list in user_dict.items(): for topology in topology_list: row: Dict[str, str] = { "cluster": cluster_name, "user": user, "environ": environment, "topology": topology} output.append(row) return pd.DataFrame(output) def get_logical_plan(tracker_url: str, cluster: str, environ: str, topology: str) -> Dict[str, Any]: """ Get the logical plan dictionary from the heron tracker API. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. Returns: Dict[str, Any]: A dictionary containing details of the spouts and bolts. Raises: requests.HTTPError: If a non 200 status code is returned. """ LOG.info("Fetching logical plan for topology: %s", topology) logical_url: str = tracker_url + "/topologies/logicalplan" response: requests.Response = requests.get(logical_url, params={"cluster": cluster, "environ": environ, "topology": topology}) try: response.raise_for_status() except requests.HTTPError as err: LOG.error("Logical plan request for topology: %s , cluster: %s, " "environment: %s failed with error code: %s", topology, cluster, environ, str(response.status_code)) raise err return response.json()["result"] def get_physical_plan(tracker_url: str, cluster: str, environ: str, topology: str) -> Dict[str, Any]: """ Get the physical plan dictionary from the heron tracker API. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. Returns: Dict[str, Any]: A dictionary containing details of the containers and stream managers for the specified topology. Raises: requests.HTTPError: If a non 200 status code is returned. """ LOG.info("Fetching physical plan for topology: %s", topology) physical_url: str = tracker_url + "/topologies/physicalplan" response: requests.Response = requests.get(physical_url, params={"cluster": cluster, "environ": environ, "topology": topology}) try: response.raise_for_status() except requests.HTTPError as err: LOG.error("Physical plan request for topology: %s , cluster: %s, " "environment: %s failed with error code: %s", topology, cluster, environ, str(response.status_code)) raise err return response.json()["result"] def get_packing_plan(tracker_url: str, cluster: str, environ: str, topology: str) -> Dict[str, Any]: """ Get the packing plan dictionary from the heron tracker API. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. Returns: Dict[str, Any]: A dictionary containing details of the containers for the specified topology, in terms of their resource allocations. Raises: requests.HTTPError: If a non 200 status code is returned. """ LOG.info("Fetching packing plan for topology: %s", topology) packing_url: str = tracker_url + "/topologies/packingplan" response: requests.Response = requests.get(packing_url, params={"cluster": cluster, "environ": environ, "topology": topology}) try: response.raise_for_status() except requests.HTTPError as err: LOG.error("Packing plan request for topology: %s , cluster: %s, " "environment: %s failed with error code: %s", topology, cluster, environ, str(response.status_code)) raise err return response.json()["result"] def parse_instance_name(instance_name: str) -> Dict[str, Union[str, int]]: """ Parses the instance name string returned by the Heron Tracker API into a dictionary with instance information. Arguments: instance_name (str): Instance name string in the form: container_<container_num>_<component>_<task_id> Returns: Dict[str, Union[str, int]]: A dictionary with the following keys: *container* : The container id as a integer, *component* : The component name string, *task_id* : The instances task id as an integer. """ parts: List[str] = instance_name.split("_") if len(parts) == 4: component: str = parts[2] elif len(parts) > 4: component = "_".join(parts[2:-2]) return {"container": int(parts[1]), "component": component, "task_id": int(parts[-1])} def get_topology_info(tracker_url: str, cluster: str, environ: str, topology: str) -> Dict[str, Union[int, str]]: """ Get the information dictionary from the heron tracker API. This contains the logical and physical plans as well as other information on the topology. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. Returns: Dict[str, Union[str, int]]: A dictionary containing all the details the tracker has on the specified topology. Raises: requests.HTTPError: If a non 200 status code is returned. """ info_url: str = tracker_url + "/topologies/info" response: requests.Response = requests.get(info_url, params={"cluster": cluster, "environ": environ, "topology": topology}) response.raise_for_status() LOG.info("Fetched information for topology: %s", topology) return response.json()["result"] def get_metrics(tracker_url: str, cluster: str, environ: str, topology: str, component: str, interval: int, metrics: Union[str, List[str]] ) -> Dict[str, Any]: """ Gets aggregated metrics for the specified component in the specified topology. Metrics are aggregated over the supplied interval. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. component (str): The name of the topology component. interval (int): The period in seconds (counted backwards from now) over which metrics should be aggregated. metrics (str or list): A metrics name or list of metrics names to be returned. Returns: Dict[str, Any]: A dictionary containing aggregate metrics for the specified topology component. Raises: requests.HTTPError: If a non 200 status code is returned. """ payload: Dict[str, Union[str, List[str], int]] = { "cluster": cluster, "environ": environ, "topology": topology, "component": component, "interval": interval, "metricname": metrics} metrics_url: str = tracker_url + "/topologies/metrics" response: requests.Response = requests.get(metrics_url, params=payload) response.raise_for_status() LOG.info("Fetched aggregate summaries for metrics: %s from topology: %s", str(metrics), topology) return response.json()["result"] def get_metrics_timeline(tracker_url: str, cluster: str, environ: str, topology: str, component: str, start_time: int, end_time: int, metrics: Union[str, List[str]] ) -> Dict[str, Any]: """ Gets metrics timelines for the specified component in the specified topology. Metrics are aggregated into one minuet intervals keyed by POSIX UTC timestamps (in seconds) for the start of each interval. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. component (str): The name of the topology component. start_time (int): The start point of the timeline. This should be a UTC POSIX timestamp in seconds. end_time (int): The end point of the timeline. This should be a UTC POSIX timestamp in seconds. metrics (str or list): A metrics name or list of metrics names to be returned. Returns: Dict[str, Any]: A dictionary containing metrics timelines for the specified topology component. Raises: requests.HTTPError: If a non 200 status code is returned. """ duration: int = end_time - start_time if duration > 10800: LOG.warning("Duration of metrics timeline interval for metrics: %s of " "topology: %s was greater than the 3 hours of data stored " "by the Topology Master", str(metrics), topology) payload: Dict[str, Union[str, List[str], int]] = { "cluster": cluster, "environ": environ, "topology": topology, "component": component, "starttime": start_time, "endtime": end_time, "metricname": metrics} metrics_timeline_url: str = tracker_url + "/topologies/metricstimeline" response: requests.Response = requests.get(metrics_timeline_url, params=payload) response.raise_for_status() LOG.info("Fetched timeline(s) for metric(s): %s of component: %s from " "topology: %s over a period of %d seconds", str(metrics), component, topology, duration) return response.json()["result"] def issue_metrics_query(tracker_url: str, cluster: str, environ: str, topology: str, start_time: int, end_time: int, query: str) -> Dict[str, Any]: """ Issues the supplied query and runs it against the metrics for the supplied topology in the interval defined by the start and end times. For query syntax see: https://apache.github.io/incubator-heron/docs/operators/heron-tracker-api/#metricsquery Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. start_time (int): The start point of the timeline. This should be a UTC POSIX timestamp in seconds. start_time (int): The end point of the timeline. This should be a UTC POSIX timestamp in seconds. query (str): The query string to be issued to the Tracker API. Returns: Dict[str, Any]: A dictionary containing the query results. Raises: requests.HTTPError: If a non 200 status code is returned. """ duration: int = end_time - start_time if duration > 10800: LOG.warning("Duration of metrics timeline interval for metrics query: " " %s for topology: %s was greater than the 3 hours of " " data stored by the Topology Master", query, topology) payload: Dict[str, Union[str, int]] = { "cluster": cluster, "environ": environ, "topology": topology, "starttime": start_time, "endtime": end_time, "query": query} metrics_query_url: str = tracker_url + "/topologies/metricsquery" response: requests.Response = requests.get(metrics_query_url, params=payload) response.raise_for_status() LOG.info("Fetched results of query: %s from topology: %s over a " "period of %d seconds", query, topology, duration) return response.json()["result"] def get_incoming_streams(logical_plan: Dict[str, Any], component_name: str) -> List[str]: """ Gets a list of input stream names for the supplied component in the supplied logical plan. Arguments: logical_plan (dict): Logical plan dictionary returned by the Heron Tracker API. component (str): The name of the component whose incoming streams are to be extracted. Returns: List[str]: A list of incoming stream names. """ return [input_stream["stream_name"] for input_stream in logical_plan["bolts"][component_name]["inputs"]] def incoming_sources_and_streams(logical_plan: Dict[str, Any], component_name: str ) -> List[Tuple[str, str]]: """ Gets a list of (source component, input stream name) tuples for the supplied component in the supplied logical plan. Arguments: logical_plan (dict): Logical plan dictionary returned by the Heron Tracker API. component (str): The name of the component whose incoming streams are to be extracted. Returns: List[str]: A list of (source component name, incoming stream name) tuples. """ return [(input_stream["component_name"], input_stream["stream_name"]) for input_stream in logical_plan["bolts"][component_name]["inputs"]] def get_outgoing_streams(logical_plan: Dict[str, Any], component_name: str) -> List[str]: """ Gets a list of output stream names for the supplied component in the supplied logical plan. Arguments: logical_plan (dict): Logical plan dictionary returned by the Heron Tracker API. component (str): The name of the component whose outgoing streams are to be extracted. Returns: List[str]: A list of outgoing stream names. """ # Check if this is a spout if logical_plan["spouts"].get(component_name): comp_type: str = "spouts" else: comp_type = "bolts" return [output_stream["stream_name"] for output_stream in logical_plan[comp_type][component_name]["outputs"]] def get_component_task_ids(tracker_url: str, cluster: str, environ: str, topology: str) -> Dict[str, List[int]]: """ Get a dictionary mapping from component name to a list of integer task id for the instances belonging to that component. Arguments: tracker_url (str): The base url string for the Heron Tracker instance. cluster (str): The cluster the topology is running in. environ (str): The environment the topology is running in (eg. prod, devel, test, etc). topology (str): The topology name. Returns: Dict[str, List[int]]: A dictionary mapping from component name to a list of integer task id for the instances belonging to that component. """ pplan: Dict[str, Any] = get_physical_plan(tracker_url, cluster, environ, topology) output: Dict[str, List[int]] = {} for comp_type in ["bolts", "spouts"]: for comp_name, instance_list in pplan[comp_type].items(): output[comp_name] = [cast(int, parse_instance_name(i_name)["task_id"]) for i_name in instance_list] return output