in graph/analysis/heron/arrival_rates.py [0:0]
def _calculate_outputs(topo_traversal: GraphTraversalSource,
source_vertex: Vertex,
arrival_rates: ARRIVAL_RATES,
output_rates: DefaultDict[int, Dict[str, float]],
coefficients: pd.Series,
) -> DefaultDict[int, Dict[str, float]]:
source_task: int = (topo_traversal.V(source_vertex)
.properties("task_id").value().next())
in_streams: List[Dict[str, str]] = \
(topo_traversal.V(source_vertex).inE("logically_connected")
.project("stream_name", "source_component")
.by(properties("stream").value())
.by(outV().properties("component").value())
.dedup()
.toList())
out_streams: List[str] = \
(topo_traversal.V(source_vertex)
.outE("logically_connected").values("stream")
.dedup().toList())
for out_stream in out_streams:
output_rate: float = 0.0
for in_stream in in_streams:
in_stream_name: str = in_stream["stream_name"]
source_component: str = in_stream["source_component"]
stream_arrivals: float = \
arrival_rates[source_task][(in_stream_name,
source_component)]
try:
coefficent: float = float(coefficients.loc[
source_task, out_stream, in_stream_name,
source_component])
except KeyError:
LOG.debug("No coefficient available for source task %d, "
"out stream %s, in stream %s from component %s",
source_task, out_stream, in_stream_name,
source_component)
else:
output_rate += (stream_arrivals * coefficent)
# It is possible that some of the IO coefficients may be negative,
# implying that the more you receive on an input stream the less you
# output to a given output stream. If we anticipate a large arrival on
# this negative input stream and low on other positive streams then it
# is possible that the predicted output rate could be negative (which
# is obviously meaningless).
if output_rate < 0.0:
output_rate = 0.0
output_rates[source_task][out_stream] = output_rate
return output_rates