in tools/heron/tracker_stats.py [0:0]
def add_logical_plan_info(tracker_url: str,
topologies: pd.DataFrame = None) -> pd.DataFrame:
""" Combines information from the topology summary DataFrame with
information from the logical plan of each topology.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topologies (pd.DataFrame): The topologies summary from the heron
tracker can be supplied, if not it will
fetched fresh from the Tracker API.
Returns:
pandas.DataFrame: The topologies summary DataFrame with logical plan
information added. This will return a new DataFrame and will not modify
the supplied DataFrame
"""
if topologies is None:
topologies = tracker.get_topologies(tracker_url)
output: List[Dict] = []
for (cluster, environ, user), data in topologies.groupby(["cluster",
"environ",
"user"]):
for topology_id in data.topology:
try:
logical_plan: Dict[str, Any] = tracker.get_logical_plan(
tracker_url, cluster, environ, topology_id)
except requests.HTTPError:
# If we cannot fetch the plan, skip this topology
continue
# there are two possible kinds of spouts:
spout_single_output = 0
spout_multiple_output = 0
# there are six possible kinds of bolts:
# two of which are sinks, and four are intermediate bolts
# sink types
bolt_single_in_zero_out = 0
bolt_multiple_in_zero_out = 0
# intermediate types:
bolt_single_in_single_out = 0
bolt_multiple_in_single_out = 0
bolt_single_in_multiple_out = 0
bolt_multiple_in_multiple_out = 0
row: Dict = {}
LOG.info("Topology ID: %s", topology_id)
for key in logical_plan["spouts"].keys():
num_outputs = len(logical_plan["spouts"][key]["outputs"])
if num_outputs == 1:
spout_single_output = spout_single_output + 1
else:
spout_multiple_output = spout_multiple_output + 1
for key in logical_plan["bolts"].keys():
outputs = len(logical_plan["bolts"][key]["outputs"])
inputs = len(logical_plan["bolts"][key]["inputs"])
# sinks
if outputs == 0:
if inputs == 1:
bolt_single_in_zero_out = bolt_single_in_zero_out + 1
elif inputs > 1:
bolt_multiple_in_zero_out = bolt_multiple_in_zero_out + 1
elif outputs == 1:
if inputs == 1:
bolt_single_in_single_out = bolt_single_in_single_out + 1
elif inputs > 1:
bolt_multiple_in_single_out = bolt_multiple_in_single_out + 1
elif outputs > 1:
if inputs == 1:
bolt_single_in_multiple_out = bolt_single_in_multiple_out + 1
elif inputs > 1:
bolt_multiple_in_multiple_out = bolt_multiple_in_multiple_out + 1
row["topology"] = topology_id
row["spout_single_output"] = spout_single_output
row["spout_multiple_output"] = spout_multiple_output
row["bolt_single_in_zero_out"] = bolt_single_in_zero_out
row["bolt_multiple_in_zero_out"] = bolt_multiple_in_zero_out
row["bolt_single_in_single_out"] = bolt_single_in_single_out
row["bolt_multiple_in_single_out"] = bolt_multiple_in_single_out
row["bolt_single_in_multiple_out"] = bolt_single_in_multiple_out
row["bolt_multiple_in_multiple_out"] = bolt_multiple_in_multiple_out
output.append(row)
return pd.DataFrame(output)