in common/heron/tracker.py [0:0]
def get_topologies(tracker_url: str, cluster: str = None,
environ: str = None) -> pd.DataFrame:
""" Gets the details from the Heron Tracker API of all registered
topologies. The results can be limited to a specific cluster and
environment.
Arguments:
tracker_url (str): The base url string for the Heron Tracker instance.
cluster (str): Optional cluster to limit search results to.
environ (str): Optional environment to limit the search to (eg. prod,
devel, test, etc).
Returns:
pandas.DataFrame: A DataFrame containing details of all topologies
registered with the Heron tracker. This has the columns for:
* topology: The topology ID.
* cluster: The cluster the topology is running on.
* environ: The environment the topology is running in.
* user: The user that uploaded the topology.
Raises:
requests.HTTPError: If a non 200 status code is returned.
"""
LOG.info("Fetching list of available topologies")
topo_url: str = tracker_url + "/topologies"
response: requests.Response = requests.get(topo_url,
params={"cluster": cluster,
"environ": environ})
try:
response.raise_for_status()
except requests.HTTPError as err:
LOG.error("Request for topology list for cluster: %s, environment: %s "
"failed with error code: %s", cluster, environ,
str(response.status_code))
raise err
results: Dict[str, Any] = response.json()["result"]
output: List[Dict[str, str]] = []
for cluster_name, cluster_dict in results.items():
for user, user_dict in cluster_dict.items():
for environment, topology_list in user_dict.items():
for topology in topology_list:
row: Dict[str, str] = {
"cluster": cluster_name,
"user": user,
"environ": environment,
"topology": topology}
output.append(row)
return pd.DataFrame(output)