model/topology/heron/helpers.py (75 lines of code) (raw):

""" This file contains helper functions. """ import logging import pandas as pd import numpy as np from typing import Dict, List LOG: logging.Logger = logging.getLogger(__name__) def convert_throughput_to_inter_arr_times(arrivals_per_min: pd.DataFrame) -> pd.DataFrame: task_arrivals: pd.DataFrame = arrivals_per_min.groupby(["task"]) df: pd.DataFrame = pd.DataFrame(columns=['task', 'mean_inter_arrival_time', 'std_inter_arrival_time']) for row in task_arrivals: data = row[1] # inter-arrival time = time in ms divided by number of tuples received in that time time = (60.0 * 1000)/data["num-tuples"] df = df.append({'task': row[1]["task"].iloc[0], 'mean_inter_arrival_time': time.mean(), 'std_inter_arrival_time': time.std()}, ignore_index=True) return df def process_execute_latencies(execute_latencies: pd.DataFrame) -> pd.DataFrame: latencies: pd.DataFrame = execute_latencies.groupby(["task"]) df: pd.DataFrame = pd.DataFrame(columns=['task', 'mean_service_time', 'std_service_time']) for row in latencies: data = row[1] latencies = data["latency_ms"] df = df.append({'task': row[1]["task"].iloc[0], 'mean_service_time': latencies.mean(), 'std_service_time': latencies.std()}, ignore_index=True) return df def convert_service_times_to_rates(latencies: pd.DataFrame) -> pd.DataFrame: grouped_latencies: pd.DataFrame = latencies.groupby(["task"]) df: pd.DataFrame = pd.DataFrame(columns=['task', 'mean_service_rate']) for row in grouped_latencies: data = row[1] latencies = data["latency_ms"] df = df.append({'task': row[1]["task"].iloc[0], 'mean_service_rate': 1/latencies.mean()}, ignore_index=True) return df def convert_arr_rate_to_mean_arr_rate(throughput: pd.DataFrame) -> pd.DataFrame: grouped_throughput: pd.DataFrame = throughput.groupby(["task"]) df: pd.DataFrame = pd.DataFrame(columns=['task', 'mean_arrival_rate']) # per minute for row in grouped_throughput: data = row[1] throughput = data["num-tuples"]/(60.0 * 1000) df = df.append({'task': row[1]["task"].iloc[0], 'mean_arrival_rate': throughput.mean()}, ignore_index=True) return df def find_end_to_end_latencies(paths: List[List[str]], waiting_times: pd.DataFrame, service_times: pd.DataFrame) -> list: """ This function goes through all end to end paths in the topology (from source to sink) and calculates the total end to end latency for each path. This end to end latency is a summation of execute latency + queue waiting time for each bolt in the path. :param paths: All end to end topology paths from source to sink :param waiting_times: The amount of time each tuple has to wait in an instance's queue :param service_times: The amount of time it takes an instance to process a tuple :return: a json list of end to end latencies for each path in the topology """ averaged_execute_latency = service_times[["task", "latency_ms"]].groupby("task").mean().reset_index() merged = averaged_execute_latency.merge(waiting_times, on=["task"])[["task", "mean_waiting_time", "latency_ms"]] result = dict() for path in paths: end_to_end_latency: np.float64 = 0.0 for x in range(len(path)): row = merged.loc[(merged["task"] == path[x])] end_to_end_latency = row["latency_ms"].tolist()[0] + row["mean_waiting_time"].tolist()[0] + end_to_end_latency result[tuple(path)] = end_to_end_latency return remap_keys(result) def remap_keys(latencies_dict: Dict[tuple, np.float64]): return [{'path': k, 'latency': v} for k, v in latencies_dict.items()] def validate_queue_size(execute_counts: pd.DataFrame, tuple_arrivals: pd.DataFrame) -> pd.DataFrame: """ This function approximates the queue size per instance at the stream manager by simply looking at how many tuples are processed per minute and how many tuples arrive per minute. Roughly, the size of the queue should be (tuples arrived from the last minute + the current minute - tuples executed in the current minute). This is only meant to be a rough estimate to validate the queue sizes returned by the queueing theory models :param execute_counts: number of tuples executed per instance :param tuple_arrivals: number of tuples that have arrived at the stream manager per instance :return: """ merged: pd.DataFrame = execute_counts.merge(tuple_arrivals, on=["task", "timestamp"])[["task","execute_count","num-tuples", "timestamp"]] merged["rough-diff"] = merged["num-tuples"] - merged["execute_count"].astype(np.float64) grouped = merged.groupby(["task"]) df: pd.DataFrame = pd.DataFrame(columns=['task', 'actual-queue-size', 'timestamp']) for row in grouped: diff = 0 for x in range(len(row[1])): if x == 0: diff = row[1]["num-tuples"].iloc[x] elif x == len(row[1]) - 1: diff = diff - row[1]["execute_count"].iloc[x].astype(np.float64) else: diff = diff + row[1]["num-tuples"].iloc[x] - row[1]["execute_count"].iloc[x].astype(np.float64) df = df.append({'task': row[1]["task"].iloc[0], 'timestamp': row[1]["timestamp"].iloc[x], 'actual-queue-size': diff}, ignore_index=True) LOG.info(df.groupby("task")[["actual-queue-size"]].mean()) return merged