tools/heron/tracker_stats.py (415 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" Script for displaying statistics for all topologies registered with a
Heron Tracker instance."""
import os
import sys
import logging
import argparse
import pickle
import datetime as dt
from typing import List, Dict, Union, Any
import requests
import pandas as pd
from caladrius import logs
from caladrius.common.heron import tracker
from caladrius.metrics.heron.topology import groupings
LOG: logging.Logger = logging.getLogger("caladrius.tools.heron.tracker_stats")
def summarise_groupings(tracker_url: str,
topologies: pd.DataFrame = None) -> pd.DataFrame:
""" Summarises the stream grouping counts of all topologies registered with
the supplied Tracker instance.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topologies (pd.DataFrame): The topologies summary from the heron
tracker can be supplied, if not it will
fetched fresh from the trackerAPI.
Returns:
A DataFrame with columns for:
topology: The topology ID
cluster: The cluster the topology is running on
environ: The environment the topology is running in
user: The user that uploaded the topology
A column for each type of stream grouping as well as combinations of
stream grouping (incoming grouping)->(outgoing grouping) and their
associate frequency count for each topology.
"""
if topologies is None:
topologies = tracker.get_topologies(tracker_url)
output: pd.DataFrame = None
for (cluster, environ), data in topologies.groupby(["cluster", "environ"]):
for topology in data.topology:
try:
grouping_summary: Dict[str, int] = \
groupings.summary(tracker_url, topology, cluster, environ)
except requests.HTTPError:
LOG.warning("Unable to fetch grouping summary for topology: "
"%s, cluster: %s, environ: %s", topology, cluster,
environ)
else:
grouping_summary["topology"] = topology
grouping_summary["cluster"] = cluster
grouping_summary["environ"] = environ
grouping_df: pd.DataFrame = pd.DataFrame([grouping_summary])
if output is None:
output = grouping_df
else:
output = output.append(grouping_df)
output = output.merge(topologies, on=["topology","environ","cluster"])
return output
def add_pplan_info(tracker_url: str,
topologies: pd.DataFrame = None) -> pd.DataFrame:
""" Combines information from the topology summary DataFrame with
information from the physical plan of each topology.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topologies (pd.DataFrame): The topologies summary from the heron
tracker can be supplied, if not it will
fetched fresh from the Tracker API.
Returns:
pandas.DataFrame: The topologies summary DataFrame with physical plan
information added. This will return a new DataFrame and will not modify
the supplied DataFrame
"""
if topologies is None:
topologies = tracker.get_topologies(tracker_url)
output: List[Dict[str, Union[str, float, List[int]]]] = []
for (cluster, environ, user), data in topologies.groupby(["cluster",
"environ",
"user"]):
for topology_id in data.topology:
try:
pplan: Dict[str, Any] = tracker.get_physical_plan(
tracker_url, cluster, environ, topology_id)
except requests.HTTPError:
# If we cannot fetch the plan, skip this topology
continue
# Add information from the configuration dictionary
config: Dict[str, str] = pplan["config"]
row: Dict[str, Union[str, float, List[int]]] = {}
row["topology"] = topology_id
row["cluster"] = cluster
row["environ"] = environ
row["user"] = user
for key, value in config.items():
# Some of the custom config values are large dictionaries or
# lists so we will skip them
if isinstance(value, (dict, list)):
continue
# Replace "." with "_" in the key name so we can use namespace
# calls on the DataFrame
new_key: str = "_".join(key.split(".")[1:])
# Try to convert any values that numeric so we can do summary
# stats
try:
new_value: Union[str, float] = float(value)
except ValueError:
new_value = value
except TypeError:
LOG.error("Value of key: %s was not a string or number it"
" was a %s", key, str(type(value)))
row[new_key] = new_value
# Add instances stats for this topology
row["total_instances"] = len(pplan["instances"])
row["instances_per_container_dist"] = \
[len(pplan["stmgrs"][stmgr]["instance_ids"])
for stmgr in pplan["stmgrs"]]
row["total_bolts"] = len(pplan["bolts"])
row["total_spouts"] = len(pplan["spouts"])
row["total_components"] = len(pplan["bolts"]) + len(pplan["spouts"])
output.append(row)
return pd.DataFrame(output)
def add_logical_plan_info(tracker_url: str,
topologies: pd.DataFrame = None) -> pd.DataFrame:
""" Combines information from the topology summary DataFrame with
information from the logical plan of each topology.
Arguments:
tracker_url (str): The URL for the Heron Tracker API
topologies (pd.DataFrame): The topologies summary from the heron
tracker can be supplied, if not it will
fetched fresh from the Tracker API.
Returns:
pandas.DataFrame: The topologies summary DataFrame with logical plan
information added. This will return a new DataFrame and will not modify
the supplied DataFrame
"""
if topologies is None:
topologies = tracker.get_topologies(tracker_url)
output: List[Dict] = []
for (cluster, environ, user), data in topologies.groupby(["cluster",
"environ",
"user"]):
for topology_id in data.topology:
try:
logical_plan: Dict[str, Any] = tracker.get_logical_plan(
tracker_url, cluster, environ, topology_id)
except requests.HTTPError:
# If we cannot fetch the plan, skip this topology
continue
# there are two possible kinds of spouts:
spout_single_output = 0
spout_multiple_output = 0
# there are six possible kinds of bolts:
# two of which are sinks, and four are intermediate bolts
# sink types
bolt_single_in_zero_out = 0
bolt_multiple_in_zero_out = 0
# intermediate types:
bolt_single_in_single_out = 0
bolt_multiple_in_single_out = 0
bolt_single_in_multiple_out = 0
bolt_multiple_in_multiple_out = 0
row: Dict = {}
LOG.info("Topology ID: %s", topology_id)
for key in logical_plan["spouts"].keys():
num_outputs = len(logical_plan["spouts"][key]["outputs"])
if num_outputs == 1:
spout_single_output = spout_single_output + 1
else:
spout_multiple_output = spout_multiple_output + 1
for key in logical_plan["bolts"].keys():
outputs = len(logical_plan["bolts"][key]["outputs"])
inputs = len(logical_plan["bolts"][key]["inputs"])
# sinks
if outputs == 0:
if inputs == 1:
bolt_single_in_zero_out = bolt_single_in_zero_out + 1
elif inputs > 1:
bolt_multiple_in_zero_out = bolt_multiple_in_zero_out + 1
elif outputs == 1:
if inputs == 1:
bolt_single_in_single_out = bolt_single_in_single_out + 1
elif inputs > 1:
bolt_multiple_in_single_out = bolt_multiple_in_single_out + 1
elif outputs > 1:
if inputs == 1:
bolt_single_in_multiple_out = bolt_single_in_multiple_out + 1
elif inputs > 1:
bolt_multiple_in_multiple_out = bolt_multiple_in_multiple_out + 1
row["topology"] = topology_id
row["spout_single_output"] = spout_single_output
row["spout_multiple_output"] = spout_multiple_output
row["bolt_single_in_zero_out"] = bolt_single_in_zero_out
row["bolt_multiple_in_zero_out"] = bolt_multiple_in_zero_out
row["bolt_single_in_single_out"] = bolt_single_in_single_out
row["bolt_multiple_in_single_out"] = bolt_multiple_in_single_out
row["bolt_single_in_multiple_out"] = bolt_single_in_multiple_out
row["bolt_multiple_in_multiple_out"] = bolt_multiple_in_multiple_out
output.append(row)
return pd.DataFrame(output)
def _get_mg_summary(topo_pplan: pd.DataFrame, groupby_term: str):
mg_summary: pd.DataFrame = pd.DataFrame(
topo_pplan.groupby([groupby_term,
"reliability_mode"]).topology.count())
mg_summary = mg_summary.reset_index()
mg_summary.rename(index=str,
columns={"topology": f"mg_{groupby_term}_count"},
inplace=True)
mg_summary = mg_summary.merge(
(mg_summary.groupby(groupby_term)[f"mg_{groupby_term}_count"].sum()
.reset_index().rename(
index=str,
columns={f"mg_{groupby_term}_count":
f"mg_{groupby_term}_total"})),
on=groupby_term)
mg_summary["overall_percentage"] = \
(mg_summary[f"mg_{groupby_term}_count"] /
topo_pplan.reliability_mode.count()
* 100)
mg_summary[f"{groupby_term}_percentage"] = \
(mg_summary[f"mg_{groupby_term}_count"] /
mg_summary[f"mg_{groupby_term}_total"] * 100)
return mg_summary
def _create_parser() -> argparse.ArgumentParser:
parser: argparse.ArgumentParser = argparse.ArgumentParser(
description=("This program provides various statistics about the "
"topologies running on a Heron Tracker instance"))
parser.add_argument("-t", "--tracker", required=False, type=str,
help=("The URL to the Heron Tracker API"))
parser.add_argument("-r", "--reload", required=False, action="store_true",
help=("If supplied then fresh information will be "
"pulled from the Heron Tracker API. Otherwise "
"cached data will be used."))
parser.add_argument("-cd", "--cache_dir", required=False, type=str,
default="/tmp/caladrius/heron/tracker/stats",
help=("The temporary directory to store Tracker "
"information."))
parser.add_argument("-o", "--output", required=False, type=str,
help=("Output file path for the statistic summary to "
"be printed to. If not supplied out will be "
"sent to standard out."))
parser.add_argument("-q", "--quiet", required=False, action="store_true",
help=("Optional flag indicating if console log output "
"should be suppressed"))
parser.add_argument("--debug", required=False, action="store_true",
help=("Optional flag indicating if debug level "
"information should be displayed"))
return parser
def _check_tracker(tracker_str: str) -> str:
if "http://" in tracker_str:
return tracker_str
return "http://" + tracker_str
if __name__ == "__main__":
ARGS: argparse.Namespace = _create_parser().parse_args()
if not ARGS.quiet:
logs.setup(debug=ARGS.debug)
NO_CACHE_DIR: bool = False
if not os.path.exists(ARGS.cache_dir):
LOG.info("No cached tracker information present")
NO_CACHE_DIR = True
os.makedirs(ARGS.cache_dir)
LOG.info("Created cache directory at: %s", ARGS.cache_dir)
elif len(os.listdir(ARGS.cache_dir)) < 1:
LOG.info("No cached tracker information present")
# If the directory exists but there is nothing in it then we need to
# load the tracker data
NO_CACHE_DIR = True
CREATE_TIME_FILE: str = os.path.join(ARGS.cache_dir, "created.pkl")
TOPO_FILE: str = os.path.join(ARGS.cache_dir, "topo.pkl")
TOPO_PPLAN_FILE: str = os.path.join(ARGS.cache_dir, "topo_pplan.pkl")
TOPO_LPLAN_FILE: str = os.path.join(ARGS.cache_dir, "topo_lplan.pkl")
GROUPING_SUMMARY_FILE: str = os.path.join(ARGS.cache_dir,
"grouping_summary.pkl")
if ARGS.reload or NO_CACHE_DIR:
if not ARGS.tracker:
err_msg: str = ("In order to load information from the Tracker API"
" the tracker URL must be supplied via the "
"-t / --tracker argument")
if ARGS.quiet:
print(err_msg, file=sys.stderr)
else:
LOG.error(err_msg)
# Exit with error code 2 (usually means cli syntax error)
sys.exit(2)
TRACKER_URL: str = _check_tracker(ARGS.tracker)
LOG.info("Fetching new data from the Heron Tracker API at %s",
TRACKER_URL)
# Get the list of topologies registered with the heron tracker
TOPOLOGIES: pd.DataFrame = tracker.get_topologies(TRACKER_URL)
LOG.info("Caching topology data at %s", TOPO_FILE)
TOPOLOGIES.to_pickle(TOPO_FILE)
# Add physical plan options
TOPO_PPLAN: pd.DataFrame = add_pplan_info(TRACKER_URL, TOPOLOGIES)
LOG.info("Caching topology physical plan data at %s",
TOPO_PPLAN_FILE)
TOPO_PPLAN.to_pickle(TOPO_PPLAN_FILE)
# Add logical plan information
TOPO_LPLAN: pd.DataFrame = add_logical_plan_info(TRACKER_URL, TOPOLOGIES)
LOG.info("Caching topology logical plan data at %s",
TOPO_LPLAN_FILE)
TOPO_LPLAN.to_pickle(TOPO_LPLAN_FILE)
# Get the stream grouping summary
GROUPING_SUMMARY: pd.DataFrame = summarise_groupings(TRACKER_URL,
TOPOLOGIES)
LOG.info("Caching stream grouping summary data at %s",
GROUPING_SUMMARY_FILE)
GROUPING_SUMMARY.to_pickle(GROUPING_SUMMARY_FILE)
# Save a time stamp so we know how old the data is
NOW: dt.datetime = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
with open(CREATE_TIME_FILE, 'wb') as time_file:
pickle.dump(NOW, time_file)
else:
with open(CREATE_TIME_FILE, 'rb') as time_file:
timestamp: dt.datetime = pickle.load(time_file)
NOW = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
duration: dt.timedelta = NOW - timestamp
LOG.info("Loading data that is %s hours old",
str(round(duration.total_seconds() / 3600, 2)))
LOG.info("Loading topology data from %s", TOPO_FILE)
TOPOLOGIES = pd.read_pickle(TOPO_FILE)
LOG.info("Loading topology physical plan data from %s",
TOPO_PPLAN_FILE)
TOPO_PPLAN = pd.read_pickle(TOPO_PPLAN_FILE)
LOG.info("Loading topology logical plan data from %s",
TOPO_PPLAN_FILE)
TOPO_LPLAN = pd.read_pickle(TOPO_LPLAN_FILE)
LOG.info("Loading stream grouping summary data from %s",
GROUPING_SUMMARY_FILE)
GROUPING_SUMMARY = pd.read_pickle(GROUPING_SUMMARY_FILE)
# Overall topology counts
TOTAL_TOPOS: int = TOPOLOGIES.topology.count()
TOPOS_BY_CLUSTER: pd.DataFrame = pd.DataFrame(
TOPOLOGIES.groupby("cluster").topology.count())
TOPOS_BY_CLUSTER["percentage"] = ((TOPOS_BY_CLUSTER.topology /
TOTAL_TOPOS) * 100)
TOPOS_BY_CLUSTER = TOPOS_BY_CLUSTER.reset_index()
TOPOS_BY_CLUSTER.rename(index=str,
columns={"topology": "topo_cluster_count"},
inplace=True)
TOPOS_BY_ENV: pd.DataFrame = pd.DataFrame(
TOPOLOGIES.groupby("environ").topology.count())
TOPOS_BY_ENV["percentage"] = ((TOPOS_BY_ENV.topology /
TOTAL_TOPOS) * 100)
TOPOS_BY_ENV = TOPOS_BY_ENV.reset_index()
MG_TOTAL_TOPOS: int = TOPO_PPLAN.reliability_mode.count()
# Message Guarantee stats
# Overall
MG_OVERALL: pd.Series = \
(TOPO_PPLAN.groupby("reliability_mode").topology.count())
MG_OVERALL = MG_OVERALL.reset_index()
MG_OVERALL.rename(index=str, columns={"topology": "mg_overall_count"},
inplace=True)
MG_OVERALL["percentage"] = (MG_OVERALL.mg_overall_count /
MG_TOTAL_TOPOS * 100)
# By Cluster
MG_BY_CLUSTER: pd.DataFrame = _get_mg_summary(TOPO_PPLAN, "cluster")
# By Environment
MG_BY_ENV: pd.DataFrame = _get_mg_summary(TOPO_PPLAN, "environ")
# Topology structure stats
LINEAR_TOPOS = TOPO_LPLAN[(TOPO_LPLAN['spout_multiple_output'] == 0) &
(TOPO_LPLAN['bolt_multiple_in_zero_out'] == 0) &
(TOPO_LPLAN['bolt_multiple_in_multiple_out'] == 0) &
(TOPO_LPLAN['bolt_multiple_in_single_out'] == 0) &
(TOPO_LPLAN['bolt_single_in_multiple_out'] == 0)].reset_index().shape[0] \
/ TOTAL_TOPOS * 100
# We look for the most common type of operator
SPOUT_MULTIPLE_OUTPUTS = \
TOPO_LPLAN[(TOPO_LPLAN['spout_multiple_output'] > 0)].reset_index().shape[0] / TOTAL_TOPOS * 100
SINK_MULTIPLE_INPUTS = \
TOPO_LPLAN[(TOPO_LPLAN['bolt_multiple_in_zero_out'] > 0)].reset_index().shape[0] / TOTAL_TOPOS * 100
BOLT_MULTIPLE_IN_OUT = \
TOPO_LPLAN[(TOPO_LPLAN['bolt_multiple_in_multiple_out'] > 0)].reset_index().shape[0] / TOTAL_TOPOS * 100
BOLT_MULTIPLE_IN_SINGLE_OUT = \
TOPO_LPLAN[(TOPO_LPLAN['bolt_multiple_in_single_out'] > 0)].reset_index().shape[0] / TOTAL_TOPOS * 100
BOLT_SINGLE_IN_MULTIPLE_OUT = \
TOPO_LPLAN[(TOPO_LPLAN['bolt_single_in_multiple_out'] > 0)].reset_index().shape[0] / TOTAL_TOPOS * 100
# We look for topologies that have both of the most common non-linear operator types:
COMMON = TOPO_LPLAN[(TOPO_LPLAN['bolt_multiple_in_single_out'] > 0) &
(TOPO_LPLAN['bolt_multiple_in_zero_out'] > 0)].reset_index()
# Grouping stats
JUST_GROUPINGS: pd.DataFrame = \
(GROUPING_SUMMARY.drop(["topology", "cluster", "environ", "user"],
axis=1))
GROUPING_OVERALL: pd.Series = \
(JUST_GROUPINGS.count() / TOTAL_TOPOS * 100)
GROUPING_ENVIRON: pd.Series = \
(GROUPING_SUMMARY.groupby(["environ"]).count() / TOTAL_TOPOS * 100)
SINGLE_GROUPING: List[Dict[str, Union[int, float]]] = []
for grouping in [g for g in list(JUST_GROUPINGS.columns) if "->" not in g]:
grouping_only_count: int = \
(JUST_GROUPINGS[JUST_GROUPINGS[grouping].notna() &
(JUST_GROUPINGS.isna().sum(axis=1) ==
len(JUST_GROUPINGS.columns) - 1)]
[grouping].count())
SINGLE_GROUPING.append({"Grouping": grouping,
"Frequency": grouping_only_count,
"% of Total":
(grouping_only_count / TOTAL_TOPOS * 100)})
PERCENTILES: List[float] = [.10, .25, .5, .75, .95, .99]
if ARGS.output:
LOG.info("Saving output to file: %s", ARGS.output)
OUT_FILE = open(ARGS.output, "w")
else:
LOG.info("Sending output to standard out")
OUT_FILE = sys.stdout
print("-------------------", file=OUT_FILE)
print("Heron Tracker Stats", file=OUT_FILE)
print("-------------------", file=OUT_FILE)
print(f"\nTotal topologies: {TOTAL_TOPOS}", file=OUT_FILE)
print("\nTotal topologies by cluster:\n", file=OUT_FILE)
print(TOPOS_BY_CLUSTER.to_string(index=False), file=OUT_FILE)
print("\nTotal topologies by environment:\n", file=OUT_FILE)
print(TOPOS_BY_ENV.to_string(index=False), file=OUT_FILE)
print("\n-------------------", file=OUT_FILE)
print("Container stats:\n", file=OUT_FILE)
print(TOPO_PPLAN.stmgrs.describe(percentiles=PERCENTILES).to_string(),
file=OUT_FILE)
print("\nTop 10 Largest topologies by container count:\n", file=OUT_FILE)
print(TOPO_PPLAN.sort_values(by="stmgrs", ascending=False)
[["topology", "cluster", "environ", "user", "total_instances",
"stmgrs"]]
.head(10).to_string(index=False), file=OUT_FILE)
print("\n-------------------", file=OUT_FILE)
print("Instance stats:\n", file=OUT_FILE)
print("\nStatistics for total number of instances per topology:\n",
file=OUT_FILE)
print(TOPO_PPLAN.total_instances.describe(
percentiles=PERCENTILES).to_string(), file=OUT_FILE)
print("\nTop 10 Largest topologies by instance count:\n", file=OUT_FILE)
print(TOPO_PPLAN.sort_values(by="total_instances", ascending=False)
[["topology", "cluster", "environ", "user", "total_instances",
"stmgrs"]].head(10).to_string(index=False), file=OUT_FILE)
print("\nStatistics for instances per container:\n", file=OUT_FILE)
output: List[int] = []
for index, dist in TOPO_PPLAN.instances_per_container_dist.iteritems():
output.extend(dist)
print(pd.Series(output).describe(percentiles=PERCENTILES).to_string(),
file=OUT_FILE)
print("\n-------------------", file=OUT_FILE)
print("Component stats:\n", file=OUT_FILE)
print("\nStatistics for total number of components per topology:\n",
file=OUT_FILE)
print(TOPO_PPLAN.total_components.describe(
percentiles=PERCENTILES).to_string(), file=OUT_FILE)
print("\nTop 20 Largest topologies by component count:\n", file=OUT_FILE)
print(TOPO_PPLAN.sort_values(by="total_components", ascending=False)
[["topology", "cluster", "environ", "user", "total_components",
"stmgrs", "total_spouts", "total_bolts", "total_instances"]]
.head(20).to_string(index=False), file=OUT_FILE)
print("\n-------------------", file=OUT_FILE)
print("Topology structure stats:\n", file=OUT_FILE)
print("\nPercentage of topologies with linear structure: ",
LINEAR_TOPOS, file=OUT_FILE)
print("\nPercentage of topologies with spouts with multiple outputs: ",
SPOUT_MULTIPLE_OUTPUTS, file=OUT_FILE)
print("\nPercentage of topologies with sinks with multiple inputs: ",
SINK_MULTIPLE_INPUTS, file=OUT_FILE)
print("\nPercentage of topologies with bolts with multiple inputs"
" and outputs: ",
BOLT_MULTIPLE_IN_OUT, file=OUT_FILE)
print("\nPercentage of topologies with bolts with multiple inputs"
" and a single output: ",
BOLT_MULTIPLE_IN_SINGLE_OUT, file=OUT_FILE)
print("\nPercentage of topologies with bolts with single input "
"and multiple outputs: ",
BOLT_SINGLE_IN_MULTIPLE_OUT, file=OUT_FILE)
print("\nPercentage of topologies with both intermediate bolts with"
" multiple inputs and sinks with multiple inputs: ",
COMMON.shape[0]/TOTAL_TOPOS * 100, file=OUT_FILE)
print("\n-------------------", file=OUT_FILE)
print("Message guarantee stats:\n", file=OUT_FILE)
print("\nTopologies with each message guarantee type - Overall\n",
file=OUT_FILE)
print(MG_OVERALL.to_string(), file=OUT_FILE)
print("\nTopologies with each message guarantee type by Cluster\n",
file=OUT_FILE)
print(MG_BY_CLUSTER.set_index(["cluster",
"reliability_mode"]).to_string(),
file=OUT_FILE)
print("\nTopologies with each message guarantee type by Environment\n",
file=OUT_FILE)
print(MG_BY_ENV.set_index(["environ", "reliability_mode"]).to_string(),
file=OUT_FILE)
print("\n-------------------", file=OUT_FILE)
print("Stream grouping stats:\n", file=OUT_FILE)
print("\nPercentage of topologies with each grouping - Overall:\n",
file=OUT_FILE)
print(GROUPING_OVERALL.to_string(), file=OUT_FILE)
print("\nPercentage of topologies with each grouping - Per Environment:\n",
file=OUT_FILE)
print(GROUPING_ENVIRON.drop(["topology", "cluster", "user"],
axis=1).to_string(), file=OUT_FILE)
print("\nTopologies with only a single grouping type:\n", file=OUT_FILE)
print(pd.DataFrame(SINGLE_GROUPING).to_string(index=False), file=OUT_FILE)