def add_logical_plan_info()

in tools/heron/tracker_stats.py [0:0]


def add_logical_plan_info(tracker_url: str,
                   topologies: pd.DataFrame = None) -> pd.DataFrame:
    """ Combines information from the topology summary DataFrame with
    information from the logical plan of each topology.

    Arguments:
        tracker_url (str):  The URL for the Heron Tracker API
        topologies (pd.DataFrame):  The topologies summary from the heron
                                    tracker can be supplied, if not it will
                                    fetched fresh from the Tracker API.

    Returns:
        pandas.DataFrame:   The topologies summary DataFrame with logical plan
        information added. This will return a new DataFrame and will not modify
        the supplied DataFrame
    """
    if topologies is None:
        topologies = tracker.get_topologies(tracker_url)
    output: List[Dict] = []

    for (cluster, environ, user), data in topologies.groupby(["cluster",
                                                              "environ",
                                                              "user"]):
        for topology_id in data.topology:

            try:
                logical_plan: Dict[str, Any] = tracker.get_logical_plan(
                    tracker_url, cluster, environ, topology_id)
            except requests.HTTPError:
                # If we cannot fetch the plan, skip this topology
                continue

            # there are two possible kinds of spouts:
            spout_single_output = 0
            spout_multiple_output = 0

            # there are six possible kinds of bolts:
            # two of which are sinks, and four are intermediate bolts
            # sink types
            bolt_single_in_zero_out = 0
            bolt_multiple_in_zero_out = 0

            # intermediate types:
            bolt_single_in_single_out = 0
            bolt_multiple_in_single_out = 0
            bolt_single_in_multiple_out = 0
            bolt_multiple_in_multiple_out = 0

            row: Dict = {}
            LOG.info("Topology ID: %s",  topology_id)
            for key in logical_plan["spouts"].keys():
                num_outputs = len(logical_plan["spouts"][key]["outputs"])
                if num_outputs == 1:
                    spout_single_output = spout_single_output + 1
                else:
                    spout_multiple_output = spout_multiple_output + 1
            for key in logical_plan["bolts"].keys():
                outputs = len(logical_plan["bolts"][key]["outputs"])
                inputs = len(logical_plan["bolts"][key]["inputs"])

                # sinks
                if outputs == 0:
                    if inputs == 1:
                        bolt_single_in_zero_out = bolt_single_in_zero_out + 1
                    elif inputs > 1:
                        bolt_multiple_in_zero_out = bolt_multiple_in_zero_out + 1
                elif outputs == 1:
                    if inputs == 1:
                        bolt_single_in_single_out = bolt_single_in_single_out + 1
                    elif inputs > 1:
                        bolt_multiple_in_single_out = bolt_multiple_in_single_out + 1
                elif outputs > 1:
                    if inputs == 1:
                        bolt_single_in_multiple_out = bolt_single_in_multiple_out + 1
                    elif inputs > 1:
                        bolt_multiple_in_multiple_out = bolt_multiple_in_multiple_out + 1

            row["topology"] = topology_id

            row["spout_single_output"] = spout_single_output
            row["spout_multiple_output"] = spout_multiple_output

            row["bolt_single_in_zero_out"] = bolt_single_in_zero_out
            row["bolt_multiple_in_zero_out"] = bolt_multiple_in_zero_out

            row["bolt_single_in_single_out"] = bolt_single_in_single_out
            row["bolt_multiple_in_single_out"] = bolt_multiple_in_single_out
            row["bolt_single_in_multiple_out"] = bolt_single_in_multiple_out
            row["bolt_multiple_in_multiple_out"] = bolt_multiple_in_multiple_out
            output.append(row)

    return pd.DataFrame(output)