metrics/heron/client.py (87 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module contains abstract base classes for the heron metrics clients
"""
import datetime as dt
from abc import abstractmethod
from typing import Union
from pandas import DataFrame
from caladrius.metrics.client import MetricsClient
class HeronMetricsClient(MetricsClient):
""" Abstract base class for all Heron metric client classes. """
@abstractmethod
def get_service_times(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets a time series of the service times of each of the bolt
instances in the specified topology"""
pass
@abstractmethod
def get_receive_counts(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets a time series of the receive counts of each of the bolt
instances in the specified topology"""
pass
@abstractmethod
def get_emit_counts(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets a time series of the emit count of each of the instances in
the specified topology"""
pass
@abstractmethod
def get_execute_counts(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets a time series of the emit count of each of the instances in
the specified topology"""
pass
@abstractmethod
def get_complete_latencies(self, topology_id: str, cluster: str,
environ: str, start: dt.datetime,
end: dt.datetime,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets a time series of the complete latencies of each of the spout
instances in the specified topology"""
pass
@abstractmethod
def get_calculated_arrival_rates(self, topology_id: str, cluster: str, environ: str,
start: dt.datetime, end: dt.datetime,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets a time series of the arrival rates, in units of tuples per
second, for each of the instances in the specified topology"""
pass
@abstractmethod
def get_incoming_queue_sizes(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets the size of the incoming queue for each component in the
topology as a timeseries. The start and end times for the window
over which to gather metrics, as well as the granularity of the
time series can also be specified."""
pass
@abstractmethod
def get_cpu_load(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets the CPU load for every running instance in the topology.
This value is a double in the [0.0,1.0] interval. A value of 0.0
means that none of the CPUs were running threads from the instance
during the recent period of time observed, while a value of 1.0 means
that all CPUs were actively running threads from the JVM
100% of the time during the recent period being observed."""
pass
@abstractmethod
def get_gc_time(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Gets the time spent in garbage collection for every running
instance in the topology."""
pass
@abstractmethod
def get_num_packets_received(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Retrieves the number of packets received (from the stream manager) per instance."""
pass
@abstractmethod
def get_packet_arrival_rate(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Retrieves the number of packets received (from stream manager) per ms per
instance."""
pass
@abstractmethod
def get_tuple_arrivals_at_stmgr(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
""" Retrieves the number of packets received at the stream manager for every local
instance. We use this value to calculate inter-arrival times of packets."""
pass
@abstractmethod
def get_end_to_end_latency(self, topology_id: str, cluster: str, environ: str, sink: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
"""This function is used to obtain end to end latency if the topology reports it.
The user would currently have to update the topology's code to report such a
metric."""
pass
@abstractmethod
def get_outgoing_queue_processing_rate(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
"""Reports the number of tuples drained from the outgoing queue at
each instance per minute"""
pass
@abstractmethod
def get_out_going_queue_arrival_rate(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
"""Reports the number of tuples added to the outgoing queue of the topology's
instances per minute"""
pass
@abstractmethod
def get_average_tuple_set_size_added_to_outgoing_queue(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime] = None, end: [dt.datetime] = None,
**kwargs: Union[str, int, float]) -> DataFrame:
"""Reports the number of tuples per tupleset that is added to the outgoing queue per minute of
an instance"""
pass