model/topology/heron/queueing_models.py (77 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module models different queues and performs relevant calculations for it.""" import datetime as dt from functools import lru_cache import pandas as pd from caladrius.metrics.client import MetricsClient from caladrius.model.topology.heron.abs_queueing_models import QueueingModels from caladrius.model.topology.heron.helpers import * from caladrius.traffic_provider.trafficprovider import TrafficProvider from caladrius.graph.gremlin.client import GremlinClient LOG: logging.Logger = logging.getLogger(__name__) def littles_law(merged: pd.DataFrame) -> pd.DataFrame: """ This function applies Little's Law to find out the expected length of the queue (https://en.wikipedia.org/wiki/Little%27s_law) Arguments: merged (pd.DataFrame) : This dataframe is expected to contain task number, arrival rate, and tuple waiting time. """ merged["queue-size"] = merged["mean_waiting_time"] * merged["mean_arrival_rate"] return merged class MMCQueue(QueueingModels): """ This class in effect models an MM1 Queue. In queueing theory, an M/M/1 queue represents the queue length in a system having a single server where arrival times of new jobs can be described using a Poisson process and job service times can be described using an exponential distribution. An extension of this model is one with multiple servers (denoted by variable 'c') and is called an M/M/c queue. """ def __init__(self, graph_client: GremlinClient, metrics_client: MetricsClient, paths, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, other_kwargs: dict): """ This function initializes relevant variables to calculate queue related metrics given an M/M/c model. """ super().__init__(metrics_client, paths, topology_id, cluster, environ, start, end, other_kwargs) # ensure that paths are populated if len(self.paths) == 0: raise Exception("Topology paths are unavailable") # Get the service time for all elements service_times: pd.DataFrame = self.metrics_client.get_service_times( topology_id, cluster, environ, start, end, **other_kwargs) # Drop the system streams self.service_times = (service_times[~service_times["stream"].str.contains("__")]) # Get arrival rates per ms for all instances # We should not be using this any more. arrival_rate: pd.DataFrame = self.metrics_client.get_tuple_arrivals_at_stmgr( topology_id, cluster, environ, start, end, **other_kwargs) # Finding mean waiting time and validating queue size self.service_rate = convert_service_times_to_rates(service_times) self.arrival_rate = convert_arr_rate_to_mean_arr_rate(arrival_rate) def average_waiting_time(self) -> pd.DataFrame: merged: pd.DataFrame = self.service_rate.merge(self.arrival_rate, on=["task"]) merged["mean_waiting_time"] = merged["mean_arrival_rate"] / \ (merged["mean_service_rate"] * (merged["mean_service_rate"] - merged["mean_arrival_rate"])) return merged def average_queue_size(self) -> pd.DataFrame: merged: pd.DataFrame = self.service_rate.merge(self.arrival_rate, on=["task"]) merged["utilization"] = merged["mean_arrival_rate"]/merged["mean_service_rate"] merged["queue-size"] = (merged["utilization"] ** 2)/(1 - merged["utilization"]) return merged def end_to_end_latencies(self) -> list: merged: pd.DataFrame = self.average_waiting_time() queue_size: pd.DataFrame = self.average_queue_size() merged = merged.merge(queue_size, on=["task"])[["utilization", "task", "mean_waiting_time", "queue-size", "mean_arrival_rate_x"]] merged = merged.rename(columns={'mean_arrival_rate_x': 'mean_arrival_rate'}) return find_end_to_end_latencies(self.paths, merged, self.service_times) class GGCQueue(QueueingModels): """ This class in effect models an GG1 Queue. The G/G/1 queue represents the queue length in a system with a single server where interarrival times have a general (or arbitrary) distribution and service times have a (different) general distribution. This system can fit more realistic scenarios as arrival rates and processing rates do not necessarily fit probabilistic distributions (such as the Poisson distribution, used to describe arrival rates in M/M/1 queues). """ def __init__(self, graph_client: GremlinClient, metrics_client: MetricsClient, paths: List, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, traffic_provider: TrafficProvider, other_kwargs: dict): """ This function initializes relevant variables to calculate queue related metrics given a G/G/c model As both data arrival distributions and processing distributions are general, it is difficult to find an exact value for the waiting time. However, we can find a probable upperbound. There is a great deal of detail available here http://home.iitk.ac.in/~skb/qbook/Slide_Set_12.PDF and http://www.math.nsc.ru/LBRT/v1/foss/gg1_2803.pdf """ super().__init__(graph_client, metrics_client, paths, topology_id, cluster, environ, start, end, other_kwargs) # ensure that paths are populated if len(self.paths) == 0: raise Exception("Topology paths are unavailable") self.service_times = traffic_provider.service_times() self.service_stats: pd.DataFrame = process_execute_latencies(self.service_times) self.arrival_rate = traffic_provider.arrival_rates() self.inter_arrival_time_stats: pd.DataFrame = traffic_provider.inter_arrival_times() self.service_rate = convert_service_times_to_rates(self.service_times) self.queue_size = pd.DataFrame @lru_cache() def average_waiting_time(self) -> pd.DataFrame: # kingman's formula merged: pd.DataFrame = self.service_stats.merge(self.inter_arrival_time_stats, on=["task"]) merged["utilization"] = merged["mean_service_time"] / merged["mean_inter_arrival_time"] merged["coeff_var_arrival"] = merged["std_inter_arrival_time"] / merged["mean_inter_arrival_time"] merged["coeff_var_service"] = merged["std_service_time"] / merged["mean_service_time"] merged["mean_waiting_time"] = merged["utilization"] / (1 - merged["utilization"]) * \ merged["mean_service_time"] * ((merged["coeff_var_service"] ** 2 + merged["coeff_var_arrival"] ** 2) / 2) return merged def average_queue_size(self) -> pd.DataFrame: # complete by calling little's laws merged: pd.DataFrame = self.arrival_rate.merge(self.service_rate, on=["task"]) average_waiting_time = self.average_waiting_time() average_waiting_time = average_waiting_time[["task", "mean_waiting_time"]] merged = merged.merge(average_waiting_time, on=["task"]) self.queue_size: pd.DataFrame = littles_law(merged) # TODO: this is validation code and should be moved to test case runners # This is a rough validation because data arrives per minute # self.queue_size["scaled-queue-size"] =\ # self.queue_size["queue-size"] * 60 * 1000 # because it was calculated per ms # LOG.info(self.queue_size[["task", # "mean_waiting_time", "mean_arrival_rate", "scaled-queue-size", "queue-size"]]) # validate_queue_size(self.execute_counts, self.tuple_arrivals) return self.queue_size def end_to_end_latencies(self) -> list: merged: pd.DataFrame = self.average_waiting_time() # for validation only queue_size: pd.DataFrame = self.average_queue_size() subset: pd.DataFrame = queue_size[["task", "queue-size"]] merged = merged.merge(subset, on=["task"])[["utilization", "task", "mean_waiting_time", "queue-size"]] return find_end_to_end_latencies(self.paths, merged, self.service_times)