def _calculate_arrivals()

in graph/analysis/heron/arrival_rates.py [0:0]


def _calculate_arrivals(topo_traversal: GraphTraversalSource,
                        source_vertex: Vertex, arrival_rates: ARRIVAL_RATES,
                        output_rates: DefaultDict[int, Dict[str, float]],
                        i2i_rps: pd.DataFrame) -> ARRIVAL_RATES:

    # Get all downstream edges and vertices for this source vertex
    out_edges: List[Dict[str, Union[str, int, float]]] = \
        (topo_traversal.V(source_vertex).outE("logically_connected")
         .project("source_task", "source_component", "stream_name",
                  "destination_task", "destination_component")
         .by(outV().properties("task_id").value())
         .by(outV().properties("component").value())
         .by(properties("stream").value())
         .by(inV().properties("task_id").value())
         .by(inV().properties("component").value())
         .toList())

    if not out_edges:
        return arrival_rates

    source_task: int = cast(int, out_edges[0]["source_task"])
    source_component: str = cast(str, out_edges[0]["source_component"])

    LOG.debug("Processing output from source instance %s_%d",
              source_component, source_task)

    for out_edge in out_edges:
        stream: str = cast(str, out_edge["stream_name"])
        try:
            stream_output: float = cast(float,
                                        output_rates[source_task][stream])
        except KeyError:
            LOG.debug("No output rate information for source task %d on "
                      "stream %s. Skipping the outgoing edge", source_task,
                      stream)
            continue

        destination_task: int = cast(int, out_edge["destination_task"])

        try:
            r_prob: float = float(i2i_rps.loc(axis=0)[source_task,
                                                      destination_task,
                                                      stream])
        except KeyError:
            LOG.debug("Unable to find routing probability for connection from "
                      "task %d to %d on stream %s", source_task,
                      destination_task, stream)

            edge_output: float = 0.0
        else:

            edge_output = (stream_output * r_prob)

            LOG.debug("Output from %s-%d to %s-%d on stream %s is "
                      "calculated as %f * %f = %f", source_component,
                      source_task, out_edge["destination_component"],
                      destination_task, stream, stream_output, r_prob,
                      edge_output)

        arrival_rates[destination_task][
            (stream, source_component)] += edge_output

    return arrival_rates