in tools/heron/tracker_stats.py [0:0]
def summarise_groupings(tracker_url: str,
topologies: pd.DataFrame = None) -> pd.DataFrame:
""" Summarises the stream grouping counts of all topologies registered with
the supplied Tracker instance.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topologies (pd.DataFrame): The topologies summary from the heron
tracker can be supplied, if not it will
fetched fresh from the trackerAPI.
Returns:
A DataFrame with columns for:
topology: The topology ID
cluster: The cluster the topology is running on
environ: The environment the topology is running in
user: The user that uploaded the topology
A column for each type of stream grouping as well as combinations of
stream grouping (incoming grouping)->(outgoing grouping) and their
associate frequency count for each topology.
"""
if topologies is None:
topologies = tracker.get_topologies(tracker_url)
output: pd.DataFrame = None
for (cluster, environ), data in topologies.groupby(["cluster", "environ"]):
for topology in data.topology:
try:
grouping_summary: Dict[str, int] = \
groupings.summary(tracker_url, topology, cluster, environ)
except requests.HTTPError:
LOG.warning("Unable to fetch grouping summary for topology: "
"%s, cluster: %s, environ: %s", topology, cluster,
environ)
else:
grouping_summary["topology"] = topology
grouping_summary["cluster"] = cluster
grouping_summary["environ"] = environ
grouping_df: pd.DataFrame = pd.DataFrame([grouping_summary])
if output is None:
output = grouping_df
else:
output = output.append(grouping_df)
output = output.merge(topologies, on=["topology","environ","cluster"])
return output