in tools/heron/tracker_stats.py [0:0]
def add_pplan_info(tracker_url: str,
topologies: pd.DataFrame = None) -> pd.DataFrame:
""" Combines information from the topology summary DataFrame with
information from the physical plan of each topology.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topologies (pd.DataFrame): The topologies summary from the heron
tracker can be supplied, if not it will
fetched fresh from the Tracker API.
Returns:
pandas.DataFrame: The topologies summary DataFrame with physical plan
information added. This will return a new DataFrame and will not modify
the supplied DataFrame
"""
if topologies is None:
topologies = tracker.get_topologies(tracker_url)
output: List[Dict[str, Union[str, float, List[int]]]] = []
for (cluster, environ, user), data in topologies.groupby(["cluster",
"environ",
"user"]):
for topology_id in data.topology:
try:
pplan: Dict[str, Any] = tracker.get_physical_plan(
tracker_url, cluster, environ, topology_id)
except requests.HTTPError:
# If we cannot fetch the plan, skip this topology
continue
# Add information from the configuration dictionary
config: Dict[str, str] = pplan["config"]
row: Dict[str, Union[str, float, List[int]]] = {}
row["topology"] = topology_id
row["cluster"] = cluster
row["environ"] = environ
row["user"] = user
for key, value in config.items():
# Some of the custom config values are large dictionaries or
# lists so we will skip them
if isinstance(value, (dict, list)):
continue
# Replace "." with "_" in the key name so we can use namespace
# calls on the DataFrame
new_key: str = "_".join(key.split(".")[1:])
# Try to convert any values that numeric so we can do summary
# stats
try:
new_value: Union[str, float] = float(value)
except ValueError:
new_value = value
except TypeError:
LOG.error("Value of key: %s was not a string or number it"
" was a %s", key, str(type(value)))
row[new_key] = new_value
# Add instances stats for this topology
row["total_instances"] = len(pplan["instances"])
row["instances_per_container_dist"] = \
[len(pplan["stmgrs"][stmgr]["instance_ids"])
for stmgr in pplan["stmgrs"]]
row["total_bolts"] = len(pplan["bolts"])
row["total_spouts"] = len(pplan["spouts"])
row["total_components"] = len(pplan["bolts"]) + len(pplan["spouts"])
output.append(row)
return pd.DataFrame(output)