in model/topology/heron/helpers.py [0:0]
def validate_queue_size(execute_counts: pd.DataFrame, tuple_arrivals: pd.DataFrame) -> pd.DataFrame:
"""
This function approximates the queue size per instance at the stream manager
by simply looking at how many tuples are processed per minute and how many tuples
arrive per minute. Roughly, the size of the queue should be (tuples arrived from
the last minute + the current minute - tuples executed in the current minute). This
is only meant to be a rough estimate to validate the queue sizes returned by
the queueing theory models
:param execute_counts: number of tuples executed per instance
:param tuple_arrivals: number of tuples that have arrived at
the stream manager per instance
:return:
"""
merged: pd.DataFrame = execute_counts.merge(tuple_arrivals, on=["task", "timestamp"])[["task","execute_count","num-tuples", "timestamp"]]
merged["rough-diff"] = merged["num-tuples"] - merged["execute_count"].astype(np.float64)
grouped = merged.groupby(["task"])
df: pd.DataFrame = pd.DataFrame(columns=['task', 'actual-queue-size', 'timestamp'])
for row in grouped:
diff = 0
for x in range(len(row[1])):
if x == 0:
diff = row[1]["num-tuples"].iloc[x]
elif x == len(row[1]) - 1:
diff = diff - row[1]["execute_count"].iloc[x].astype(np.float64)
else:
diff = diff + row[1]["num-tuples"].iloc[x] - row[1]["execute_count"].iloc[x].astype(np.float64)
df = df.append({'task': row[1]["task"].iloc[0],
'timestamp': row[1]["timestamp"].iloc[x],
'actual-queue-size': diff}, ignore_index=True)
LOG.info(df.groupby("task")[["actual-queue-size"]].mean())
return merged