model/topology/heron/queueing_theory.py (145 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains classes and methods for modelling the performance of Heron topologies using queueing theory. """ import logging import datetime as dt import pandas as pd from typing import Any, cast, Dict, Tuple from caladrius.model.topology.heron.base import HeronTopologyModel from caladrius.model.topology.heron.abs_queueing_models import QueueingModels from caladrius.model.topology.heron.queueing_models import MMCQueue, GGCQueue from caladrius.metrics.heron.client import HeronMetricsClient from caladrius.graph.gremlin.client import GremlinClient from caladrius.graph.analysis.heron import arrival_rates from caladrius.graph.utils.heron import graph_check, read_paths from caladrius.performance_prediction.predictor import Predictor from caladrius.performance_prediction.simple_predictor import SimplePredictor from caladrius.traffic_provider.trafficprovider import TrafficProvider LOG: logging.Logger = logging.getLogger(__name__) class QTTopologyModel(HeronTopologyModel): """ This model implementation predict topology performance using queueing theory. NOTE: This is very much a work in progress and so far only does comparisons of arrival rates and service times to check QT model validity. """ name: str = "queueing_theory" description: str = ("Models the topology as a queuing network and flags " "if back pressure is likely at instances.") def __init__(self, config: Dict[str, Any], metrics_client: HeronMetricsClient, graph_client: GremlinClient) -> None: super().__init__(config, metrics_client, graph_client) self.metrics_client: HeronMetricsClient self.tracker_url: str = config["heron.tracker.url"] def predict_arrival_rates(self, topology_id: str, cluster: str, environ: str, spout_traffic: Dict[int, Dict[str, float]], start: dt.datetime, end: dt.datetime, metric_bucket_length: int, topology_ref: str = None, **kwargs: Any ) -> Tuple[pd.DataFrame, pd.DataFrame]: if not topology_ref: # Get the reference of the latest physical graph entry for this # topology, or create a physical graph if there are non. topology_ref = graph_check(self.graph_client, self.config, self.tracker_url, cluster, environ, topology_id) # Predict Arrival Rates for all elements instance_ars: pd.DataFrame strmgr_ars: pd.DataFrame instance_ars, strmgr_ars = \ arrival_rates.calculate( self.graph_client, self.metrics_client, topology_id, cluster, environ, topology_ref, start, end, metric_bucket_length, self.tracker_url, spout_traffic, **kwargs) # Sum the arrivals from each source component of each incoming stream instance_ars.groupby(["task", "incoming_stream"]).sum() in_ars: pd.DataFrame = \ (instance_ars.groupby(["task", "incoming_stream"]).sum() .reset_index().rename(index=str, columns={"incoming_stream": "stream"})) return in_ars, strmgr_ars def find_current_instance_waiting_times(self, topology_id: str, cluster: str, environ: str, traffic_source: TrafficProvider, start: dt.datetime, end: dt.datetime, **kwargs: Any) -> list: LOG.info("Calculating end to end performance latency of topology " "%s using queueing theory", topology_id) # Remove the start and end time kwargs so we don't supply them twice to # the metrics client. other_kwargs: Dict[str, Any] = {key: value for key, value in kwargs.items() if key not in ["start", "end"]} paths = read_paths(other_kwargs, topology_id, cluster, environ) queue: QueueingModels = GGCQueue(self.graph_client, self.metrics_client, paths, topology_id, cluster, environ, start, end, traffic_source, other_kwargs) return queue.end_to_end_latencies() def predict_current_performance( self, topology_id: str, cluster: str, environ: str, spout_traffic: Dict[int, Dict[str, float]], **kwargs: Any) -> pd.DataFrame: """ Arguments: topology_id (str): The topology identification string spout_traffic (dict): The expected output of the spout instances. These emit values should be in tuples per second (tps) otherwise they will not match with the service time measurements. """ # TODO: check spout traffic keys are integers! start, end = get_start_end_times(**kwargs) metric_bucket_length: int = cast(int, self.config["metric.bucket.length"]) LOG.info("Predicting traffic levels and backpressure of currently running " "topology %s using queueing theory model", topology_id) # Remove the start and end time kwargs so we don't supply them twice to # the metrics client. # TODO: We need to make this cleaner? Add start and end to topo model? other_kwargs: Dict[str, Any] = {key: value for key, value in kwargs.items() if key not in ["start", "end"]} # Get the service time for all elements service_times: pd.DataFrame = self.metrics_client.get_service_times( topology_id, cluster, environ, start, end, **other_kwargs) if service_times.empty: raise Exception("Metric client returned empty data frame for service times.") # Calculate the service rate for each instance service_times["tuples_per_sec"] = 1.0 / (service_times["latency_ms"] / 1000.0) # Drop the system streams service_times = (service_times[~service_times["stream"] .str.contains("__")]) # Calculate the median service time and rate service_time_summary: pd.DataFrame = \ (service_times[["task", "stream", "latency_ms", "tuples_per_sec"]] .groupby(["task", "stream"]).median().reset_index()) # Get the reference of the latest physical graph entry for this # topology, or create a physical graph if there are non. topology_ref: str = graph_check(self.graph_client, self.config, self.tracker_url, cluster, environ, topology_id) # Predict the arrival rate at all instances with the supplied spout # traffic in_ars, strmgr_ars = self.predict_arrival_rates( topology_id, cluster, environ, spout_traffic, start, end, metric_bucket_length, topology_ref) combined: pd.DataFrame = service_time_summary.merge( in_ars, on=["task", "stream"]) combined["capacity"] = (combined["arrival_rate"] / combined["tuples_per_sec"]) * 100.0 combined["back_pressure"] = combined["capacity"] > 100.0 return combined def predict_packing_plan(self, topology_id: str, cluster: str, environ: str, start: dt.datetime, end: dt.datetime, traffic_provider: TrafficProvider, **kwargs: Any) -> Dict[str, Any]: LOG.info("Calculating a new packing plan of the topology %s, based on performance from %s to %s", topology_id, str(start), str(end)) # Remove the start and end time kwargs so we don't supply them twice to # the metrics client. other_kwargs: Dict[str, Any] = {key: value for key, value in kwargs.items() if key not in ["start", "end"]} paths = read_paths(other_kwargs, topology_id, cluster, environ) queue: QueueingModels = GGCQueue(self.graph_client, self.metrics_client, paths, topology_id, cluster, environ, start, end, traffic_provider, other_kwargs) p: Predictor = SimplePredictor(topology_id, cluster, environ, start, end, self.tracker_url, self.metrics_client, self.graph_client, queue, **other_kwargs) return p.create_new_plan() def get_start_end_times(**kwargs) -> (dt.datetime, dt.datetime): if "start" in kwargs and "end" in kwargs: start_ts: int = int(kwargs["start"]) start: dt.datetime = dt.datetime.utcfromtimestamp(start_ts) end_ts: int = int(kwargs["end"]) end: dt.datetime = dt.datetime.utcfromtimestamp(end_ts) LOG.info("Start and end time stamps supplied, using metric " "gathering period from %s to %s", start.isoformat(), end.isoformat()) elif "start" in kwargs and "end" not in kwargs: end = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) start_ts = int(kwargs["start"]) start = dt.datetime.utcfromtimestamp(start_ts) LOG.info("Only start time (%s) was supplied. Setting end time to " "UTC now: %s", start.isoformat(), end.isoformat()) elif "source_hours" in kwargs: end = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) start = end - dt.timedelta(hours=int(kwargs["source_hours"])) LOG.info("Source hours provided, using metric gathering period " "from %s to %s", start.isoformat(), end.isoformat()) elif "source_mins" in kwargs: end = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) start = end - dt.timedelta(minutes=int(kwargs["source_mins"])) LOG.info("Source mins provided, using metric gathering period " "from %s to %s", start.isoformat(), end.isoformat()) else: err_msg: str = ("Neither 'start', 'end' or 'source_hours' or 'source_mins' " "key word arguments were supplied. Either 'start'," " 'start' and 'end' or 'source_hours' should be " "provided") LOG.error(err_msg) raise RuntimeError(err_msg) return start, end