in traffic_provider/current_traffic.py [0:0]
def __init__(self, metrics_client: HeronMetricsClient, graph_client: GremlinClient, topology_id: str,
cluster: str, environ: str, start: [dt.datetime], end: [dt.datetime],
traffic_config: Dict[str, Any], **other_kwargs) -> None:
self.graph_client = graph_client
self.metrics_client: HeronMetricsClient = metrics_client
self.topology = topology_id
self.cluster = cluster
self.environ = environ
self.start = start
self.end = end
self.kwargs = other_kwargs
self.tuples = self.metrics_client.get_tuple_arrivals_at_stmgr\
(self.topology, cluster, environ, start, end, **other_kwargs)
spouts = graph_client.graph_traversal.V().has("topology_id", self.topology). \
hasLabel("spout").where(outE("logically_connected")).properties('component').value().dedup().toList()
spout_queue_processing_rate = metrics_client.get_outgoing_queue_processing_rate(
topology_id, cluster, environ, start, end)
self.spout_queue_processing_rate = \
spout_queue_processing_rate.loc[spout_queue_processing_rate['component'].isin(spouts)]
num_tuples_added_to_spout_gateway_queue = metrics_client.get_out_going_queue_arrival_rate(
self.topology, cluster, environ, start, end)
self.num_tuples_added_to_spout_gateway_queue = \
num_tuples_added_to_spout_gateway_queue.loc[
num_tuples_added_to_spout_gateway_queue['component'].isin(spouts)]
spout_tuple_set_size = metrics_client.get_average_tuple_set_size_added_to_outgoing_queue(
self.topology, cluster, environ, start, end)
self.spout_tuple_set_size = spout_tuple_set_size.loc[spout_tuple_set_size['component'].isin(spouts)]
spout_arrival_rates = self.num_tuples_added_to_spout_gateway_queue.\
rename(index=str, columns={"tuples-added-to-queue": "num-tuples"})
self.spout_arrival_rates = spout_arrival_rates.\
merge(self.spout_tuple_set_size, on=["task", "component", "container", "timestamp"])
self.spout_arrival_rates["num-tuples"] = self.spout_arrival_rates["num-tuples"] *\
self.spout_arrival_rates["tuple-set-size"]