def _setup_arrival_calcs()

in graph/analysis/heron/arrival_rates.py [0:0]


def _setup_arrival_calcs(metrics_client: HeronMetricsClient,
                         graph_client: GremlinClient,
                         topology_id: str, cluster: str, environ: str,
                         topology_ref: str, start: dt.datetime,
                         end: dt.datetime, io_bucket_length: int,
                         tracker_url: str, **kwargs: Union[str, int, float]
                         ) -> Tuple[pd.DataFrame, List[List[Vertex]],
                                    pd.DataFrame, Dict[Vertex, List[int]],
                                    Dict[Vertex, List[int]]]:
    """ Helper method which sets up the data needed for the arrival rate
    calculations. This is a separate cached method as these data are not
    effected by the traffic (spout_state) and so do not need to be recalculated
    for a new traffic level for the same topology id/ref. """

    topo_traversal: GraphTraversalSource = \
        graph_client.topology_subgraph(topology_id, topology_ref)

    # Calculate the routing probabilities for the defined metric gathering
    # period
    i2i_rps: pd.Series = (calculate_inter_instance_rps(
        metrics_client, topology_id, cluster, environ, start, end, tracker_url,
        **kwargs).set_index(["source_task", "destination_task", "stream"])
     ["routing_probability"])

    # Get the vertex levels for the logical graph tree
    LOG.info("Calculating levels for topology %s reference %s", topology_id,
             topology_ref)
    levels: List[List[Vertex]] = get_levels(topo_traversal)
    LOG.debug("Found %d levels is topology %s reference %s", len(levels),
              topology_id, topology_ref)

    # Calculate the input output ratios for each instances using data from the
    # defined metrics gathering period
    coefficients: pd.Series = lstsq_io_ratios(
        metrics_client, graph_client, topology_id, cluster, environ, start,
        end, io_bucket_length, **kwargs).set_index(["task", "output_stream",
                                                    "input_stream",
                                                    "source_component"]
                                                   )["coefficient"]

    # Get the details of the incoming and outgoing physical connections for
    # stream manager in the topology

    # Get a dictionary mapping from stream manager id string to a list of the
    # instances (within each container) that will send tuples to each stream
    # manager
    sending_instances: Dict[Vertex, List[int]] = \
        (topo_traversal.V().hasLabel("stream_manager")
         .group().by("id").by(in_("physically_connected")
                              .hasLabel(P.within("spout", "bolt"))
                              .values("task_id")
                              .fold())
         .next())

    # Get a dictionary mapping from stream manager id string to a list of the
    # instances (within each container) that will receive tuples from each
    # stream manager
    receiving_instances: Dict[Vertex, List[int]] = \
        (topo_traversal.V().hasLabel("stream_manager")
         .group().by("id").by(out("physically_connected")
                              .hasLabel("bolt").values("task_id").fold())
         .next())

    return (i2i_rps, levels, coefficients, sending_instances,
            receiving_instances)