in graph/analysis/heron/arrival_rates.py [0:0]
def _calc_strmgr_in_out(sending_instances: Dict[str, List[int]],
receiving_instances: Dict[str, List[int]],
output_rates: OUTPUT_RATES,
arrival_rates: ARRIVAL_RATES) -> pd.DataFrame:
strmgr_outgoing: Dict[str, float] = {}
for stream_manager, sending_task_list in sending_instances.items():
total_sent: float = 0.0
for task_id in sending_task_list:
total_sent += sum(output_rates[task_id].values())
strmgr_outgoing[stream_manager] = total_sent
strmgr_incoming: Dict[str, float] = {}
for stream_manager, receiving_task_list in receiving_instances.items():
total_receieved: float = 0.0
for task_id in receiving_task_list:
total_receieved += sum(arrival_rates[task_id].values())
strmgr_incoming[stream_manager] = total_receieved
# Convert the stream manager dictionaries into a DataFrame. It is possible
# that a container could only hold spouts (in which chase would have no
# entry in the incoming dict) or only sinks (therefore no entries in the
# outgoing dict) so take the union of keys from both dicts and add None to
# the DF if the key is missing
strmgr_output: List[Dict[str, Union[str, float, None]]] = []
for strmgr in (set(strmgr_incoming.keys()) or set(strmgr_outgoing.keys())):
row: Dict[str, Union[str, float, None]] = {
"id": strmgr,
"incoming": strmgr_incoming.get(strmgr, None),
"outgoing": strmgr_outgoing.get(strmgr, None)}
strmgr_output.append(row)
return pd.DataFrame(strmgr_output)