in metrics/heron/topology/groupings.py [0:0]
def summary(tracker_url: str, topology_id: str, cluster: str,
environ: str) -> Dict[str, int]:
""" Gets a summary of the numbers of each stream grouping type in the
specified topology.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topology_id (str): The topology ID string
cluster (str): The name of the cluster the topology is running on
environ (str): The environment the topology is running in
Returns:
A dictionary mapping from stream grouping name to the count for the
number of these type of stream groupings in the topology. Also includes
counts for stream combination, e.g. SHUFFLE->FIELDS : 2 implies that
there are 2 cases where the source component of a fields grouped stream
has an incoming shuffle grouped stream.
"""
lplan: Dict[str, Any] = tracker.get_logical_plan(tracker_url, cluster,
environ, topology_id)
stream_set: Set[Tuple[str, str, str]] = set()
for bolt_details in lplan["bolts"].values():
for input_stream in bolt_details["inputs"]:
stream_set.add((input_stream["stream_name"],
input_stream["component_name"],
input_stream["grouping"]))
grouping_counts: DefaultDict[str, int] = defaultdict(int)
for _, source_component, grouping in stream_set:
grouping_counts[grouping] += 1
# Now look at the inputs in to this source component and count the
# types of input grouping
if source_component in lplan["bolts"]:
for in_stream in lplan["bolts"][source_component]["inputs"]:
in_grouping: str = in_stream["grouping"]
grouping_counts[in_grouping + "->" + grouping] += 1
return dict(grouping_counts)