traffic_provider/current_traffic.py (72 lines of code) (raw):
from caladrius.traffic_provider.trafficprovider import TrafficProvider
from caladrius.metrics.heron.client import HeronMetricsClient
from caladrius.model.topology.heron.helpers import convert_arr_rate_to_mean_arr_rate, \
convert_throughput_to_inter_arr_times
from caladrius.graph.gremlin.client import GremlinClient
import datetime as dt
from gremlin_python.process.graph_traversal import outE
import pandas as pd
from typing import Dict, Any
class CurrentTraffic(TrafficProvider):
""" This module takes in the metrics client and uses it to provide current traffic information.
As opposed to the predicted traffic provider, it also models the spout information"""
# we don't need the traffic config but we can add it to make the arguments the same in both traffic providers
def __init__(self, metrics_client: HeronMetricsClient, graph_client: GremlinClient, topology_id: str,
cluster: str, environ: str, start: [dt.datetime], end: [dt.datetime],
traffic_config: Dict[str, Any], **other_kwargs) -> None:
self.graph_client = graph_client
self.metrics_client: HeronMetricsClient = metrics_client
self.topology = topology_id
self.cluster = cluster
self.environ = environ
self.start = start
self.end = end
self.kwargs = other_kwargs
self.tuples = self.metrics_client.get_tuple_arrivals_at_stmgr\
(self.topology, cluster, environ, start, end, **other_kwargs)
spouts = graph_client.graph_traversal.V().has("topology_id", self.topology). \
hasLabel("spout").where(outE("logically_connected")).properties('component').value().dedup().toList()
spout_queue_processing_rate = metrics_client.get_outgoing_queue_processing_rate(
topology_id, cluster, environ, start, end)
self.spout_queue_processing_rate = \
spout_queue_processing_rate.loc[spout_queue_processing_rate['component'].isin(spouts)]
num_tuples_added_to_spout_gateway_queue = metrics_client.get_out_going_queue_arrival_rate(
self.topology, cluster, environ, start, end)
self.num_tuples_added_to_spout_gateway_queue = \
num_tuples_added_to_spout_gateway_queue.loc[
num_tuples_added_to_spout_gateway_queue['component'].isin(spouts)]
spout_tuple_set_size = metrics_client.get_average_tuple_set_size_added_to_outgoing_queue(
self.topology, cluster, environ, start, end)
self.spout_tuple_set_size = spout_tuple_set_size.loc[spout_tuple_set_size['component'].isin(spouts)]
spout_arrival_rates = self.num_tuples_added_to_spout_gateway_queue.\
rename(index=str, columns={"tuples-added-to-queue": "num-tuples"})
self.spout_arrival_rates = spout_arrival_rates.\
merge(self.spout_tuple_set_size, on=["task", "component", "container", "timestamp"])
self.spout_arrival_rates["num-tuples"] = self.spout_arrival_rates["num-tuples"] *\
self.spout_arrival_rates["tuple-set-size"]
def tuple_arrivals(self):
return self.tuples
def arrival_rates(self):
bolt_arrival_rates = convert_arr_rate_to_mean_arr_rate(self.tuples)
spout_arrival_rates = convert_arr_rate_to_mean_arr_rate(self.spout_arrival_rates)
arr_rates = spout_arrival_rates.append(bolt_arrival_rates, sort=True)
return arr_rates
def inter_arrival_times(self):
bolt_inter_arrival_times = convert_throughput_to_inter_arr_times(self.tuples)
spout_arrival_rates = convert_throughput_to_inter_arr_times(self.spout_arrival_rates)
return bolt_inter_arrival_times.append(spout_arrival_rates, sort=True)
def service_times(self):
"""
This function finds out the service times for a topology's spouts and merges them with the service times of bolts.
Spouts don't actually process tuples -- however, they do have a queue that stores outgoing tuples.
Here, we meant the amount of time it takes per tuple to flush out tuples in the queue. Times are
returned in ms
:return: a dataframe of processing latencies
"""
merged = self.spout_queue_processing_rate. \
merge(self.num_tuples_added_to_spout_gateway_queue, on=["timestamp", "component", "task", "container"]). \
merge(self.spout_tuple_set_size, on=["timestamp", "component", "task", "container"])
df: pd.DataFrame = pd.DataFrame(columns=['task', 'latency_ms', "timestamp", "component", "container"])
for index, data in merged.iterrows():
# tuples processed in a minute
processed_tuples = data["instance-processing-rate"] * data["tuple-set-size"]
if (processed_tuples > 0):
# these are the number of tuples processed per millisecond
latency = (60 * 1000) / processed_tuples
df = df.append({'task': data["task"], 'latency_ms': latency,
'component': data["component"], "container": data["container"],
"timestamp": data["timestamp"]}, ignore_index=True)
bolt_service_times = self.metrics_client.get_service_times(self.topology, self.cluster,
self.environ, self.start, self.end, **self.kwargs)
if bolt_service_times.empty:
raise Exception("Service times for the topology's bolts are unavailable")
bolt_service_times.drop(["stream"], axis=1, inplace=True)
return df.append(bolt_service_times, sort=True)