graph/analysis/heron/io_ratios.py (105 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module contains methods for calculating the input output (I/O) ratio
for the instances of a given topology. """
import logging
import datetime as dt
from typing import Dict, Union, List, Tuple
import pandas as pd
import numpy as np
from caladrius.metrics.heron.client import HeronMetricsClient
from caladrius.graph.gremlin.client import GremlinClient
LOG: logging.Logger = logging.getLogger(__name__)
def get_in_out_components(graph_client: GremlinClient,
topology_id: str) -> List[str]:
""" Gets a list of components that have both incoming and outgoing streams.
Arguments:
graph_client (GremlinClient): The client instance for the graph
database.
topology_id (str): The topology identification string.
Returns:
A list of component name strings.
"""
in_out_comps: List[str] = (graph_client.graph_traversal.V()
.hasLabel("bolt").has("topology_id",
topology_id)
.inE("logically_connected")
.inV().as_("in_out")
.outE("logically_connected")
.select("in_out").by("component")
.dedup().toList())
return in_out_comps
def lstsq_io_ratios(metrics_client: HeronMetricsClient,
graph_client: GremlinClient, topology_id: str,
cluster: str, environ: str,
start: dt.datetime, end: dt.datetime, bucket_length: int,
**kwargs: Union[str, int, float]) -> pd.DataFrame:
""" This method will calculate the input/output ratio for each instance in
the supplied topology using data aggregated from the defined period. The
method uses least squares regression to calculate a coefficient for each
input stream into a instance such that the total output amount for a given
output stream is sum of all input stream arrival amounts times their
coefficient.
*NOTE*: This method assumes that there is an (approximately) linear
relationship between the inputs and outputs of a given component.
Arguments:
metrics_client (HeronMetricsClient): The client instance for the
metrics database.
graph_client (GremlinClient): The client instance for the graph
database.
topology_id (str): The topology identification string.
start (dt.datetime): The UTC datetime object for the start of the
metric gathering period.
end (dt.datetime): The UTC datetime object for the end of the metric
gathering period.
bucket_length (int): The length in seconds that the metrics should
be aggregated into. *NOTE*: For the least
squares regression to work the number of
buckets must exceed the highest number of input
streams into the component of the topology.
**kwargs: Additional keyword arguments that will be passed to the
metrics client object. Consult the documentation for the
specific metrics client beings used.
Returns:
pandas.DataFrame: A DataFrame with the following columns:
* task: Task ID integer.
* output_stream: The output stream name.
* input_stream: The input stream name.
* source_component: The name of the source component for the input
stream.
* coefficient: The value of the input amount coefficient for this
output stream, inputs stream source component combination.
"""
LOG.info("Calculating instance input/output ratios using least squares "
"regression for topology %s over a %d second window between %s "
"and %s", topology_id, (end-start).total_seconds(),
start.isoformat(), end.isoformat())
emit_counts: pd.DataFrame = metrics_client.get_emit_counts(
topology_id, cluster, environ, start, end, **kwargs)
arrived_tuples: pd.DataFrame = metrics_client.get_tuple_arrivals_at_stmgr(
topology_id, cluster, environ, start, end, **kwargs)
execute_counts: pd.DataFrame = metrics_client.get_execute_counts(
topology_id, cluster, environ, start, end, **kwargs)
arrived_tuples = arrived_tuples.merge(execute_counts, on=["task", "component", "container", "timestamp"])
arrived_tuples.drop("execute_count", axis=1, inplace=True)
# Limit the count DataFrames to only those component with both incoming and
# outgoing streams
in_out_comps: List[str] = get_in_out_components(graph_client, topology_id)
emit_counts = emit_counts[emit_counts["component"].isin(in_out_comps)]
emit_counts.rename(index=str, columns={"stream": "outgoing_stream"},
inplace=True)
arrived_tuples = arrived_tuples[arrived_tuples["component"]
.isin(in_out_comps)]
arrived_tuples.rename(index=str, columns={"stream": "incoming_stream"},
inplace=True)
# Re-sample the counts into equal length time buckets and group by task id,
# time bucket and stream. This aligns the two DataFrames with timestamps of
# equal length and start point so they can be merged later
emit_counts_ts: pd.DataFrame = \
(emit_counts.set_index(["task", "timestamp"])
.groupby([pd.Grouper(level="task"),
pd.Grouper(freq=f"{bucket_length}S", level='timestamp'),
"component", "outgoing_stream"])
["emit_count"]
.sum().reset_index())
arrived_tuples_ts: pd.DataFrame = \
(arrived_tuples.set_index(["task", "timestamp"])
.groupby([pd.Grouper(level="task"),
pd.Grouper(freq=f"{bucket_length}S", level='timestamp'),
"component", "incoming_stream", "source_component"])
["num-tuples"]
.sum().reset_index())
rows: List[Dict[str, Union[str, float]]] = []
# Now we loop through each component and munge the data until we have an
# output total for each output stream for each task on the same row (one
# row per time bucket) as the input total for each input stream
component: str
in_data: pd.DataFrame
for component, in_data in arrived_tuples_ts.groupby(["component"]):
in_stream_counts: pd.DataFrame = \
(in_data.set_index(["task", "timestamp", "incoming_stream",
"source_component"])
["num-tuples"].unstack(level=["incoming_stream",
"source_component"])
.reset_index())
out_stream_counts: pd.DataFrame = \
emit_counts_ts[emit_counts_ts.component == component]
merged: pd.DataFrame = out_stream_counts.merge(in_stream_counts,
on=["task",
"timestamp"])
task: int
out_stream: str
data: pd.DataFrame
for (task, out_stream), data in merged.groupby(["task",
"outgoing_stream"]):
LOG.debug("Processing instance %d output stream %s", task,
out_stream)
# Get a series of the output counts for this output stream, these
# are the dependent variables (b) of the least squares regression
# a x = b
output_counts: pd.DataFrame = data.emit_count
# If this instance's component has output stream registered that
# nothing else subscribes too then the emit count will be zero and
# we can skip this output stream
if output_counts.sum() <= 0.0:
LOG.debug("No emissions from instance %d on stream %s, "
"skipping this stream...", task, out_stream)
continue
# Get just the input stream counts for each time bucket. This is
# the coefficients matrix (a) of the least squares regression
# a x = b
cols: List[Tuple[str, str]] = data.columns[5:]
input_counts: pd.DataFrame = data[cols]
coeffs: List[float]
coeffs, _, _, _ = np.linalg.lstsq(input_counts, output_counts,
rcond=None)
i: int
in_stream: str
source: str
for i, (in_stream, source) in enumerate(cols):
row: Dict[str, Union[str, float]] = {
"task": task,
"output_stream": out_stream,
"input_stream": in_stream,
"source_component": source,
"coefficient": coeffs[i]}
rows.append(row)
result = pd.DataFrame(rows)
if result.empty:
raise Exception("lstsq_io_ratios returns an empty dataframe")
return result