metrics/heron/topology/groupings.py (30 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains methods for analysing and summarising the groupings within Heron topologies. """ import logging from typing import Set, Tuple, Dict, Any, DefaultDict from collections import defaultdict from caladrius.common.heron import tracker LOG: logging.Logger = logging.getLogger(__name__) def summary(tracker_url: str, topology_id: str, cluster: str, environ: str) -> Dict[str, int]: """ Gets a summary of the numbers of each stream grouping type in the specified topology. Arguments: tracker_url (str): The URL for the Heron Tracker API topology_id (str): The topology ID string cluster (str): The name of the cluster the topology is running on environ (str): The environment the topology is running in Returns: A dictionary mapping from stream grouping name to the count for the number of these type of stream groupings in the topology. Also includes counts for stream combination, e.g. SHUFFLE->FIELDS : 2 implies that there are 2 cases where the source component of a fields grouped stream has an incoming shuffle grouped stream. """ lplan: Dict[str, Any] = tracker.get_logical_plan(tracker_url, cluster, environ, topology_id) stream_set: Set[Tuple[str, str, str]] = set() for bolt_details in lplan["bolts"].values(): for input_stream in bolt_details["inputs"]: stream_set.add((input_stream["stream_name"], input_stream["component_name"], input_stream["grouping"])) grouping_counts: DefaultDict[str, int] = defaultdict(int) for _, source_component, grouping in stream_set: grouping_counts[grouping] += 1 # Now look at the inputs in to this source component and count the # types of input grouping if source_component in lplan["bolts"]: for in_stream in lplan["bolts"][source_component]["inputs"]: in_grouping: str = in_stream["grouping"] grouping_counts[in_grouping + "->" + grouping] += 1 return dict(grouping_counts) def has_fields_fields(tracker_url: str, topology_id: str, cluster: str, environ: str) -> bool: """ Performs a check to see if the specified topology has components connected via a fields grouping where the source component of that connection also receives a fields grouping. Arguments: tracker_url (str): The URL for the Heron Tracker API topology_id (str): The topology ID string cluster (str): The name of the cluster the topology is running on environ (str): The environment the topology is running in Returns: A boolean indicating if there is a field grouping source that also receives a fields grouping. """ grouping_summary: Dict[str, int] = summary(tracker_url, topology_id, cluster, environ) if "FIELDS->FIELDS" in grouping_summary: return True return False