def validate_queue_size()

in model/topology/heron/helpers.py [0:0]


def validate_queue_size(execute_counts: pd.DataFrame, tuple_arrivals: pd.DataFrame) -> pd.DataFrame:
    """
    This function approximates the queue size per instance at the stream manager
    by simply looking at how many tuples are processed per minute and how many tuples
    arrive per minute. Roughly, the size of the queue should be (tuples arrived from
    the last minute + the current minute - tuples executed in the current minute). This
    is only meant to be a rough estimate to validate the queue sizes returned by
    the queueing theory models
    :param execute_counts: number of tuples executed per instance
    :param tuple_arrivals: number of tuples that have arrived at
    the stream manager per instance
    :return:
    """
    merged: pd.DataFrame = execute_counts.merge(tuple_arrivals, on=["task", "timestamp"])[["task","execute_count","num-tuples", "timestamp"]]
    merged["rough-diff"] = merged["num-tuples"] - merged["execute_count"].astype(np.float64)

    grouped = merged.groupby(["task"])

    df: pd.DataFrame = pd.DataFrame(columns=['task', 'actual-queue-size', 'timestamp'])
    for row in grouped:
        diff = 0
        for x in range(len(row[1])):
            if x == 0:
                diff = row[1]["num-tuples"].iloc[x]
            elif x == len(row[1]) - 1:
                diff = diff - row[1]["execute_count"].iloc[x].astype(np.float64)
            else:
                diff = diff + row[1]["num-tuples"].iloc[x] - row[1]["execute_count"].iloc[x].astype(np.float64)

            df = df.append({'task': row[1]["task"].iloc[0],
                            'timestamp': row[1]["timestamp"].iloc[x],
                            'actual-queue-size': diff}, ignore_index=True)

    LOG.info(df.groupby("task")[["actual-queue-size"]].mean())
    return merged