in graph/analysis/heron/arrival_rates.py [0:0]
def _calculate_arrivals(topo_traversal: GraphTraversalSource,
source_vertex: Vertex, arrival_rates: ARRIVAL_RATES,
output_rates: DefaultDict[int, Dict[str, float]],
i2i_rps: pd.DataFrame) -> ARRIVAL_RATES:
# Get all downstream edges and vertices for this source vertex
out_edges: List[Dict[str, Union[str, int, float]]] = \
(topo_traversal.V(source_vertex).outE("logically_connected")
.project("source_task", "source_component", "stream_name",
"destination_task", "destination_component")
.by(outV().properties("task_id").value())
.by(outV().properties("component").value())
.by(properties("stream").value())
.by(inV().properties("task_id").value())
.by(inV().properties("component").value())
.toList())
if not out_edges:
return arrival_rates
source_task: int = cast(int, out_edges[0]["source_task"])
source_component: str = cast(str, out_edges[0]["source_component"])
LOG.debug("Processing output from source instance %s_%d",
source_component, source_task)
for out_edge in out_edges:
stream: str = cast(str, out_edge["stream_name"])
try:
stream_output: float = cast(float,
output_rates[source_task][stream])
except KeyError:
LOG.debug("No output rate information for source task %d on "
"stream %s. Skipping the outgoing edge", source_task,
stream)
continue
destination_task: int = cast(int, out_edge["destination_task"])
try:
r_prob: float = float(i2i_rps.loc(axis=0)[source_task,
destination_task,
stream])
except KeyError:
LOG.debug("Unable to find routing probability for connection from "
"task %d to %d on stream %s", source_task,
destination_task, stream)
edge_output: float = 0.0
else:
edge_output = (stream_output * r_prob)
LOG.debug("Output from %s-%d to %s-%d on stream %s is "
"calculated as %f * %f = %f", source_component,
source_task, out_edge["destination_component"],
destination_task, stream, stream_output, r_prob,
edge_output)
arrival_rates[destination_task][
(stream, source_component)] += edge_output
return arrival_rates