model/traffic/heron/stats_summary.py (106 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module contains classes and methods for modelling traffic based on
summaries of historic spout emission metrics. """
import logging
import datetime as dt
from typing import List, Dict, Any, Union, cast, DefaultDict
from collections import defaultdict
import pandas as pd
from caladrius.common.timestamp import calculate_ts_period
from caladrius.model.traffic.heron.base import HeronTrafficModel
from caladrius.metrics.heron.client import HeronMetricsClient
from caladrius.graph.gremlin.client import GremlinClient
LOG: logging.Logger = logging.getLogger(__name__)
SUMMARY_DICT = Dict[str, float]
class StatsSummaryTrafficModel(HeronTrafficModel):
""" This model provides summary statistics for the spout instances emit
metrics (traffic)."""
name: str = "stats_summary"
description: str = ("Provides summary traffic statistics for the "
"specified topology. Statistics are based on emit "
"count metrics from the topologies spout instances.")
def __init__(self, config: dict, metrics_client: HeronMetricsClient,
graph_client: GremlinClient) -> None:
super().__init__(config, metrics_client, graph_client)
self.metrics_client: HeronMetricsClient
if "stats.summary.model.default.source.hours" in config:
self.default_source_hours: float = \
config["stats.summary.model.default.source.hours"]
else:
LOG.warning("Default source hours were not supplied via "
"configuration file. Setting to 3 hours.")
self.default_source_hours = 3
if "stats.summary.model.quantiles" in config:
self.quantiles: List[int] = config["stats.summary.model.quantiles"]
else:
LOG.warning("Quantile values were not set via configuration file "
" using; 10, 90, 95, 99 as defaults.")
self.quantiles = [10, 90, 95, 99]
def predict_traffic(self, topology_id: str, cluster: str, environ: str,
**kwargs: Union[str, int, float]) -> Dict[str, Any]:
""" This method will provide a summary of the emit counts from the
spout instances of the specified topology. It will summarise the emit
metrics over the number of hours defined by the source_hours keyword
argument and provide summary statistics (mean, median, min, max and
quantiles) over all instances of each component and for each individual
instance.
Arguments:
topology_id (str): The topology ID string
**source_hours (int): Optional keyword argument for the number of
hours (backwards from now) of metrics data
to summarise.
**metric_sample_period (float): This is an optional argument
specifying the period (seconds) of
the metrics returned by the metrics
client.
Returns:
dict: A dictionary with top level keys for "components" which
links to summary statistics for each spout instance and "instances"
with summary statistics for each individual instance of each spout
component.
The dictionary has the form:
["components"]
[statistic_name]
[component_name]
[output_stream_name] = emit_count
["instances"]
[statistic_name]
[task_id]
[output_stream_name] = emit_count
All dictionary keys are stings to allow easy conversion to JSON.
"""
if "source_hours" not in kwargs:
LOG.warning("source_hours parameter (indicating how many hours of "
"historical data to summarise) was not provided, "
"using default value of %d hours",
self.default_source_hours)
source_hours: float = self.default_source_hours
else:
source_hours = cast(float, float(kwargs["source_hours"]))
LOG.info("Predicting traffic for topology %s using statistics summary "
"model", topology_id)
end: dt.datetime = dt.datetime.now(dt.timezone.utc)
start: dt.datetime = end - dt.timedelta(hours=source_hours)
spout_comps: List[str] = (self.graph_client.graph_traversal.V()
.has("topology_id", topology_id)
.hasLabel("spout").values("component")
.dedup().toList())
emit_counts: pd.DataFrame = self.metrics_client.get_emit_counts(
topology_id, cluster, environ, start, end, **kwargs)
spout_emit_counts: pd.DataFrame = emit_counts[
emit_counts["component"].isin(spout_comps)]
if "metrics_sample_period" in kwargs:
time_period_sec: float = \
cast(float, float(kwargs["metrics_sample_period"]))
else:
# TODO: This method needs to be made robust, interleaved unique
# timestamps will return entirely the wrong period!!!
# TODO: Maybe introduce aggregation time bucket into metric client
# methods to create known time period.
# TODO: Or just accept the timer frequency of the metrics as an
# argument? For TMaster it is always 1 min and for others this can
# be supplied in the kwargs passed to the metrics get method so it
# will be available.
time_period_sec: float = \
calculate_ts_period(spout_emit_counts.timestamp)
LOG.info("Emit count data was calculated to have a period of %f "
"seconds", time_period_sec)
output: Dict[str, Any] = {}
output["details"] = {"start": start.isoformat(),
"end": end.isoformat(),
"source_hours": source_hours,
"original_metric_frequency_secs":
time_period_sec}
components: DefaultDict[str, DefaultDict[str, SUMMARY_DICT]] = \
defaultdict(lambda: defaultdict(dict))
for (comp, stream), comp_data in \
spout_emit_counts.groupby(["component", "stream"]):
LOG.debug("Processing component: %s stream: %s", comp, stream)
components["mean"][comp][stream] = \
(float(comp_data.emit_count.mean()) / time_period_sec)
components["median"][comp][stream] = \
(float(comp_data.emit_count.median()) / time_period_sec)
components["max"][comp][stream] = \
(float(comp_data.emit_count.max()) / time_period_sec)
components["min"][comp][stream] = \
(float(comp_data.emit_count.min()) / time_period_sec)
for quantile in self.quantiles:
components[f"{quantile}-quantile"][comp][stream] = \
(float(comp_data.emit_count.quantile(quantile/100)) /
time_period_sec)
output["components"] = components
instances: DefaultDict[str, DefaultDict[str, SUMMARY_DICT]] = \
defaultdict(lambda: defaultdict(dict))
for (task_id, stream), task_data in \
spout_emit_counts.groupby(["task", "stream"]):
LOG.debug("Processing instance: %d stream: %s", task_id, stream)
instances["mean"][str(task_id)][stream] = \
(float(task_data.emit_count.mean()) / time_period_sec)
instances["median"][str(task_id)][stream] = \
(float(task_data.emit_count.median()) / time_period_sec)
instances["max"][str(task_id)][stream] = \
(float(task_data.emit_count.max()) / time_period_sec)
instances["min"][str(task_id)][stream] = \
(float(task_data.emit_count.min()) / time_period_sec)
for quantile in self.quantiles:
instances[f"{quantile}-quantile"][str(task_id)][stream] = \
(float(task_data.emit_count.quantile(quantile/100)) /
time_period_sec)
output["instances"] = instances
return output