traffic_provider/predicted_traffic.py (59 lines of code) (raw):

import datetime as dt import logging import pandas as pd from typing import Any, Dict from caladrius.traffic_provider.trafficprovider import TrafficProvider from caladrius.graph.gremlin.client import GremlinClient from caladrius.metrics.heron.client import HeronMetricsClient from caladrius.model.traffic.heron.base import HeronTrafficModel from caladrius.model.traffic.heron.prophet import ProphetTrafficModel from model.topology.heron.helpers import convert_throughput_to_inter_arr_times LOG: logging.Logger = logging.getLogger(__name__) class PredictedTraffic(TrafficProvider): """ This module takes in user-provided information and uses it to create a HeronTraffic Model that can be used to predict and supply arrival rate information about traffic in future. """ def __init__(self, metrics_client: HeronMetricsClient, graph_client: GremlinClient, topology_id: str, cluster: str, environ: str, start: [dt.datetime], end: [dt.datetime], traffic_config: Dict[str, Any], **other_kwargs) -> None: self.topology_id = topology_id self.cluster = cluster self.environ = environ self.start = start self.end = end self.kwargs = other_kwargs self.graph_client: GremlinClient = graph_client self.metrics_client: HeronMetricsClient = metrics_client self.arrival_rate = None self.inter_arrival_time = None self.tuple_arrival = None model: HeronTrafficModel = ProphetTrafficModel(traffic_config, self.metrics_client, self.graph_client) # this data structure contains data received per instance per second self.prediction_results = model.predict_traffic(topology_id, cluster, environ, **other_kwargs) def tuple_arrivals(self): """This function returns the number of tuples arrived at an instance in a minute""" # Note that these are tuple arrival values for future cases and predictions themselves. # we shouldn't use them to validate queue sizes. if self.arrival_rate is None: self.arrival_rates() self.tuple_arrival = self.arrival_rate.copy() # the following converts the data back to tuple arrivals per minute self.tuple_arrival['num-tuples'] = self.arrival_rate['mean_arrival_rate'] * 60 * 1000 self.tuple_arrival.drop(["mean_arrival_rate"], axis=1) return self.tuple_arrival def arrival_rates(self) -> pd.DataFrame: """This function returns the number of tuples arrived at an instance per ms""" df: pd.DataFrame = pd.DataFrame(columns=['task', 'mean_arrival_rate']) # this function returns arrival rates as number of tuples that arrive per millisecond # as prediction_results data is in seconds, we need to divide to get data for millseconds # format --> task mean_arrival_rate mean_rates = self.prediction_results["instances"]["mean"] for task, rates in mean_rates.items(): data_across_streams = 0 for _, value in rates.items(): data_across_streams = data_across_streams + value df = df.append({'task': task, 'mean_arrival_rate': data_across_streams / 1000}, ignore_index=True) self.arrival_rate = df return df def inter_arrival_times(self): """This function returns the time between the arrival of two subsequent tuples in ms""" # this function returns arrival times between two subsequent tuples in ms # task mean_inter_arrival_time std_inter_arrival_time if self.inter_arrival_time is None: if self.tuple_arrival is None: self.tuple_arrivals() self.inter_arrival_time = convert_throughput_to_inter_arr_times(self.tuple_arrival) return self.inter_arrival_time def service_times(self): """TODO: Also predict service times for all components!""" bolt_service_times = self.metrics_client.get_service_times(self.topology_id, self.cluster, self.environ, self.start, self.end, **self.kwargs) # Drop the system streams if bolt_service_times.empty: raise Exception("Service times for the topology's bolts are unavailable") bolt_service_times.drop(["component", "stream"], axis=1, inplace=True) return bolt_service_times