def calculate()

in graph/analysis/heron/arrival_rates.py [0:0]


def calculate(graph_client: GremlinClient, metrics_client: HeronMetricsClient,
              topology_id: str, cluster: str, environ: str, topology_ref: str,
              start: dt.datetime, end: dt.datetime, io_bucket_length: int,
              tracker_url: str, spout_state: Dict[int, Dict[str, float]],
              **kwargs: Union[str, int, float]
              ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """

    Arguments:
        graph_client (GremlinClient):   The client instance for the graph
                                        database.
        metrics_client (HeronMetricsClient):    The client instance for the
                                                metrics database.
        topology_id (str):  The topology identification string.
        cluster: (str): The cluster the topology is running on.
        environ (str): The environment the topology is running in.
        topology_ref (str): The reference string for the topology physical
                            graph to be used in the calculations.
        start (dt.datetime):    The UTC datetime instance representing the
                                start of the metric gathering window.
        end (dt.datetime):  The UTC datetime instance representing the end of
                            the metric gathering window.
        io_bucket_length (int): The length in seconds that metrics should be
                                aggregated for use in IO ratio calculations.
        tracker_url (str):  The URL for the Heron Tracker API
        spout_state (dict): A dictionary mapping from instance task id to a
                            dictionary that maps from output stream name to the
                            output rate for that spout instance. The units of
                            this rate (TPS, TPM etc) will be the same for the
                            arrival rates.
        **kwargs:   Any additional key word arguments required by the metrics
                    client query methods. NOTE: This is passed to a cached
                    method so all kwargs must be hashable. Un-hashable
                    arguments will be removed before being supplied.

    Returns:
        pd.DataFrame:   A DataFrame containing the arrival rate at each
                        instance.
        pd.DataFrame:   A DataFrame containing the input and output rate of
                        each stream  manager.

    Raises:
        RuntimeError:   If there is no entry in the graph database for the
                        supplied topology id and ref.
    """

    # First check that there is a physical graph for the supplied reference in
    # the graph database
    graph_client.raise_if_missing(topology_id, topology_ref)

    LOG.info("Calculating arrival rates for topology %s reference %s using "
             "metrics from a %d second period from %s to %s", topology_id,
             topology_ref, (end-start).total_seconds(), start.isoformat(),
             end.isoformat())

    i2i_rps, levels, coefficients, sending_instances, receiving_instances = \
        _setup_arrival_calcs(metrics_client, graph_client, topology_id,
                             cluster, environ, topology_ref, start, end,
                             io_bucket_length, tracker_url, **kwargs)

    topo_traversal: GraphTraversalSource = \
        graph_client.topology_subgraph(topology_id, topology_ref)

    arrival_rates: ARRIVAL_RATES = defaultdict(lambda: defaultdict(float))
    output_rates: OUTPUT_RATES = defaultdict(dict)
    output_rates.update(spout_state)

    # Step through the tree levels and calculate the output from each level and
    # the arrivals at the next. Skip the final level as its arrival rates are
    # calculated in the previous step and it has no outputs.
    for level_number, level in enumerate(levels[:-1]):

        LOG.debug("Processing topology level %d", level_number)

        if level_number != 0:
            # If this is not a spout level then we need to calculate the output
            # from the instances in this level.
            for source_vertex in level:

                output_rates = _calculate_outputs(topo_traversal,
                                                  source_vertex, arrival_rates,
                                                  output_rates, coefficients)

        # Calculate the arrival rates at the instances down stream on the next
        # level down
        for source_vertex in level:

            arrival_rates = _calculate_arrivals(topo_traversal, source_vertex,
                                                arrival_rates, output_rates,
                                                i2i_rps)

    # At this stage we have the output and arrival amount for all logically
    # connected elements. We now need to map these on to the stream managers to
    # calculate their incoming and outgoing tuple rates.
    strmgr_in_out: pd.DataFrame = _calc_strmgr_in_out(
        sending_instances, receiving_instances, output_rates, arrival_rates)

    return _convert_arrs_to_df(arrival_rates), strmgr_in_out