def _calc_strmgr_in_out()

in graph/analysis/heron/arrival_rates.py [0:0]


def _calc_strmgr_in_out(sending_instances: Dict[str, List[int]],
                        receiving_instances: Dict[str, List[int]],
                        output_rates: OUTPUT_RATES,
                        arrival_rates: ARRIVAL_RATES) -> pd.DataFrame:

    strmgr_outgoing: Dict[str, float] = {}

    for stream_manager, sending_task_list in sending_instances.items():
        total_sent: float = 0.0
        for task_id in sending_task_list:
            total_sent += sum(output_rates[task_id].values())

        strmgr_outgoing[stream_manager] = total_sent

    strmgr_incoming: Dict[str, float] = {}

    for stream_manager, receiving_task_list in receiving_instances.items():
        total_receieved: float = 0.0
        for task_id in receiving_task_list:
            total_receieved += sum(arrival_rates[task_id].values())

        strmgr_incoming[stream_manager] = total_receieved

    # Convert the stream manager dictionaries into a DataFrame. It is possible
    # that a container could only hold spouts (in which chase would have no
    # entry in the incoming dict) or only sinks (therefore no entries in the
    # outgoing dict) so take the union of keys from both dicts and add None to
    # the DF if the key is missing
    strmgr_output: List[Dict[str, Union[str, float, None]]] = []
    for strmgr in (set(strmgr_incoming.keys()) or set(strmgr_outgoing.keys())):
        row: Dict[str, Union[str, float, None]] = {
            "id": strmgr,
            "incoming": strmgr_incoming.get(strmgr, None),
            "outgoing": strmgr_outgoing.get(strmgr, None)}
        strmgr_output.append(row)

    return pd.DataFrame(strmgr_output)