def calculate_inter_instance_rps()

in metrics/heron/topology/routing_probabilities.py [0:0]


def calculate_inter_instance_rps(metrics_client: HeronMetricsClient,
                                 topology_id: str, cluster: str, environ: str,
                                 start: dt.datetime, end: dt.datetime
                                 ) -> pd.DataFrame:
    """ Get a DataFrame with the instance to instance routing probabilities for
    each source instance's output streams.

    Arguments:
        metrics_client (HeronMetricsClient):    The metrics client from which
                                                to extract transfer count data
                                                from.
        topology_id (str):  The topology identification string.
        cluster (str): The cluster the topology is running on.
        environ (str): The environment the topology is running in.
        start (dt.datetime):    The UTC datetime object for the start of the
                                metrics gathering widow.
        end (dt.datetime):  The UTC datetime object for the end of the metrics
                            gathering widow.

    Returns:
        pandas.DataFrame: A DataFrame with the following columns:

        * source_component: The source instance's component name.
        * source_task: The source instances task ID.
        * stream: The stream ID string for the outgoing stream from the source.
        * destination_component: The destination instance's component name.
        * destination_task: The destination instance's task ID.
        * routing_probability: The probability (between 0 and 1) that a tuple
          leaving the source instance on the specified stream will be routed to
          the destination instance.
    """

    LOG.info("Calculating instance to instance routing probabilities for "
             "topology %s for period from %s to %s", topology_id,
             start.isoformat(), end.isoformat())

    # Get the receive counts for the topology
    rec_counts: pd.DataFrame = metrics_client.get_receive_counts(
        topology_id, cluster, environ, start, end)

    # Get the instance to instance transfers
    transfer_counts: pd.DataFrame = rec_counts.groupby(
        ["source_component", "source_task", "stream", "component", "task"]
        )["receive_count"].sum().reset_index()
    transfer_counts.rename(index=str,
                           columns={"receive_count": "transfer_count"},
                           inplace=True)

    # Get the total emitted by each instance onto each stream
    total_emissions: pd.DataFrame = rec_counts.groupby(
        ["source_component", "source_task", "stream", "component"]
        )["receive_count"].sum().reset_index()
    total_emissions.rename(index=str,
                           columns={"receive_count": "total_emitted"},
                           inplace=True)

    # Merge the total emissions from each instance and the total transferred
    # between instances into a single DataFrame
    merged_counts: pd.DataFrame = total_emissions.merge(
        transfer_counts, on=["source_component", "source_task", "stream",
                             "component"])

    # Calculate the routing probability
    merged_counts["routing_probability"] = (merged_counts["transfer_count"] /
                                            merged_counts["total_emitted"])

    merged_counts["routing_probability"].fillna(0, inplace=True)

    merged_counts.rename(index=str,
                         columns={"component": "destination_component",
                                  "task": "destination_task"},
                         inplace=True)

    return merged_counts[["source_component", "source_task", "stream",
                          "destination_component", "destination_task",
                          "routing_probability"]]