def calc_current_inter_instance_rps()

in metrics/heron/topology/routing_probabilities.py [0:0]


def calc_current_inter_instance_rps(
    metrics_client: HeronMetricsClient, topology_id: str, cluster: str,
    environ: str, start: dt.datetime, end: dt.datetime, tracker_url: str,
    **kwargs: Union[str, int, float]) -> pd.DataFrame:
    """ Get a DataFrame with the instance to instance routing probabilities for
    each source instance's output streams from a currently running topology.
    This method uses several assumptions to infer the routing probabilities
    between connections.

    This method will not work for calculating routing probabilities of
    connections that come from a source component that was itself a recipient
    of a fields connection. This method relies on the source instances of
    fields grouped connections receiving the same key distribution as all
    other source instances of that component. i.e. it assumes the sources of
    all fields groupings only receive shuffle groupings.

    This method also assumes that the spout instances will emit equal key
    distributions if they are the source of fields grouped connections.

    Arguments:
        metrics_client (HeronMetricsClient):    The metrics client from which
                                                to extract transfer count data
                                                from.
        topology_id (str):  The topology identification string.
        cluster (str):    The cluster parameter for the Heron Tracker API.
        environ (str):    The environ parameter (prod, devel, test etc) for
                            the Heron Tracker API.
        start (dt.datetime):    The UTC datetime object for the start of the
                                metrics gathering widow.
        end (dt.datetime):  The UTC datetime object for the end of the metrics
                            gathering widow.
        tracker_url (str):    The URL for the Heron Tracker API. This method
                              needs to analyse the logical and physical plans
                              of the specified topology so needs access to
                              this API.

    Returns:
        pandas.DataFrame:   A DataFrame with the following columns:

        * source_component: The source instance's component name.
        * source_task: The source instances task ID.
        * stream: The stream ID string for the outgoing stream from the source.
        * destination_component: The destination instance's component name.
        * destination_task: The destination instance's task ID.
        * routing_probability: The probability (between 0 and 1) that a tuple
          leaving the source instance on the specified stream will be routed to
          the destination instance.

    Raises:
        RuntimeError:   If any of the specified key word arguments are not
                        supplied.
        NotImplementedError:    It the specified topology has a fields grouped
                                connection leading into another fields grouped
                                connection. This is not a currently supported
                                scenario.
    """

    LOG.info("Calculating instance to instance routing probabilities for "
             "topology %s for period from %s to %s", topology_id,
             start.isoformat(), end.isoformat())

    LOG.debug("Checking for fields to fields grouped connections")

    if groupings.has_fields_fields(tracker_url, topology_id, cluster, environ):
        grouping_msg: str = (
            f"The topology {topology_id} has at least one fields grouped "
            f"connection where the source of the connection also received a "
            f"fields grouped connection. This means the key distribution into "
            f"the source could be unbalanced (across the component) and this "
            f"method does not yet support this scenario.")
        LOG.error(grouping_msg)
        raise NotImplementedError(grouping_msg)

    isap: pd.DataFrame = calculate_ISAP(metrics_client, topology_id, cluster,
                                        environ, start, end, **kwargs)

    # Remove system hearbeat streams
    isap = isap[~isap.source_component.str.contains("__")]

    # Munge the frame into the correct format. Take an average of the whole
    # time series for each instance
    # TODO: Look at other summary methods for ISAP time series
    r_probs: pd.DataFrame = (isap.groupby(["task", "component", "stream",
                                           "source_component"])
                             .ISAP
                             .mean()
                             .reset_index()
                             .rename(index=str,
                                     columns={"ISAP": "routing_probability",
                                              "task": "destination_task",
                                              "component":
                                              "destination_component"}))

    comp_task_ids: Dict[str, List[int]] = \
        tracker.get_component_task_ids(tracker_url, cluster, environ,
                                       topology_id)

    output: List[Dict[str, Union[str, int, float]]] = []

    for row in r_probs.itertuples():
        for source_task in comp_task_ids[row.source_component]:
            output.append({
                "source_task": source_task,
                "source_component": row.source_component,
                "stream": row.stream,
                "destination_task": row.destination_task,
                "destination_component": row.destination_component,
                "routing_probability": row.routing_probability})

    return pd.DataFrame(output)