api/model/topology/heron.py (247 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This is the rooting logic for the Apache Heron topology performance
modelling API """
from flask import request
from flask_restful import Resource
import logging
import json
import pandas as pd
from typing import List, Type, Dict, Any, Tuple
from caladrius.api import utils
from caladrius.graph.gremlin.client import GremlinClient
from caladrius.graph.utils.heron import graph_check, paths_check
from caladrius.metrics.heron.client import HeronMetricsClient
from caladrius.model.topology.heron.base import HeronTopologyModel
from caladrius.model.topology.heron.queueing_theory import get_start_end_times
from caladrius.traffic_provider.predicted_traffic import PredictedTraffic
from caladrius.traffic_provider.current_traffic import CurrentTraffic
LOG: logging.Logger = logging.getLogger(__name__)
class HeronTopologyModels(Resource):
""" Resource class for the Heron topology model information end point."""
def __init__(self, model_classes: List[Type]) -> None:
self.models_info: List[Dict[str, Any]] = []
for model in model_classes:
model_info: Dict[str, str] = dict()
model_info["name"] = model.name
model_info["description"] = model.description
self.models_info.append(model_info)
def get(self) -> List[Dict[str, Any]]:
""" Returns the configured traffic models as a list of model
information dictionaries that contain "name" and "description" keys.
"""
return self.models_info
class HeronCurrent(Resource):
""" Resource class for modelling the performance of currently running Heron
topologies. """
def __init__(self, model_classes: List[Type], model_config: Dict[str, Any],
metrics_client: HeronMetricsClient,
graph_client: GremlinClient, tracker_url: str) -> None:
self.metrics_client: HeronMetricsClient = metrics_client
self.graph_client: GremlinClient = graph_client
self.tracker_url: str = tracker_url
self.model_config: Dict[str, Any] = model_config
self.models: Dict[str, HeronTopologyModel] = {}
for model_class in model_classes:
model = model_class(model_config, metrics_client, graph_client)
self.models[model.name] = model
super().__init__()
def post(self, topology_id: str) -> Tuple[Dict[str, Any], int]:
""" Method handling POST requests to the current topology performance
modelling endpoint."""
# Make sure we have the args we need
errors: List[Dict[str, str]] = []
if "cluster" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'cluster' parameter should be supplied"})
if "environ" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'environ' parameter should be supplied"})
if "model" not in request.args:
errors.append({"type": "MissingParameter",
"error": ("At least one 'model' parameter should "
"be supplied. Supply 'all' to run all "
"configured models")})
# Return useful errors to the client if any parameters are missing
if errors:
return {"errors": errors}, 400
LOG.info("Processing performance modelling request for topology: %s, "
"cluster: %s, environment: %s, using model: %s",
topology_id, request.args.get("cluster"),
request.args.get("environ"),
str(request.args.getlist("model")))
# Make sure we have a current graph representing the physical plan for
# the topology
try:
graph_check(self.graph_client, self.model_config, self.tracker_url,
request.args["cluster"], request.args["environ"],
topology_id)
except Exception as err:
LOG.error("Error running graph check for topology: %s -> %s",
topology_id, str(err))
errors.append({"topology": topology_id,
"type": str(type(err)),
"error": str(err)})
return {"errors": errors}, 400
# Get the spout traffic state and convert the json string task ID to
# integers
json_traffic: Dict[str, Dict[str, float]] = request.get_json()
traffic: Dict[int, Dict[str, float]] = \
{int(key): value for key, value in json_traffic.items()}
if "all" in request.args.getlist("model"):
LOG.info("Running all configured Heron topology performance "
"models")
models = self.models.keys()
else:
models = request.args.getlist("model")
# Convert the request.args to a dict suitable for passing as **kwargs
model_kwargs: Dict[str, Any] = \
utils.convert_wimd_to_dict(request.args)
# Remove the models list + other keys from the kwargs as it is only
# needed by this method
model_kwargs.pop("model")
model_kwargs.pop("cluster")
model_kwargs.pop("environ")
cluster = request.args.get("cluster")
environ = request.args.get("environ")
output = {}
for model_name in models:
LOG.info("Running topology performance model %s", model_name)
model = self.models[model_name]
try:
results: pd.DataFrame = model.predict_current_performance(
topology_id=topology_id,
cluster=cluster,
environ=environ,
spout_traffic=traffic, **model_kwargs)
except Exception as err:
LOG.error("Error running model: %s -> %s", model.name,
str(err))
errors.append({"model": model.name, "type": str(type(err)),
"error": str(err)})
else:
output[model_name] = results.to_json()
if errors:
return {"errors": errors}, 500
return output, 200
def get(self, topology_id: str) -> Tuple[Dict[str, Any], int]:
""" Method handling requests for the currently running topology's end to
end latency"""
# Make sure we have the args we need
errors: List[Dict[str, str]] = []
if "cluster" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'cluster' parameter should be supplied"})
if "environ" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'environ' parameter should be supplied"})
if "model" not in request.args:
errors.append({"type": "MissingParameter",
"error": ("At least one 'model' parameter should "
"be supplied. Supply 'all' to run all "
"configured models")})
# Return useful errors to the client if any parameters are missing
if errors:
return {"errors": errors}, 400
LOG.info("Processing performance modelling request for topology: %s, "
"cluster: %s, environment: %s, using model: %s", topology_id,
request.args.get("cluster"), request.args.get("environ"),
str(request.args.getlist("model")))
cluster = request.args.get("cluster")
environ = request.args.get("environ")
# Make sure we have a current graph representing the physical plan for
# the topology
graph_check(self.graph_client, self.model_config, self.tracker_url,
cluster, environ, topology_id)
# Make sure we have a file containing all paths for the job
paths_check(self.graph_client, self.model_config, cluster,
environ, topology_id)
if "all" in request.args.getlist("model"):
LOG.info("Running all configured Heron topology performance "
"models")
models = self.models.keys()
else:
models = request.args.getlist("model")
# Convert the request.args to a dict suitable for passing as **kwargs
model_kwargs: Dict[str, Any] = \
utils.convert_wimd_to_dict(request.args)
# Remove the models list from the kwargs as it is only needed by this
# method
model_kwargs.pop("model")
model_kwargs.pop("cluster")
model_kwargs.pop("environ")
model_kwargs["zk.time.offset"] = self.model_config["zk.time.offset"]
model_kwargs["heron.statemgr.root.path"] = self.model_config["heron.statemgr.root.path"]
model_kwargs["heron.statemgr.connection.string"] = self.model_config["heron.statemgr.connection.string"]
start, end = get_start_end_times(**model_kwargs)
traffic_provider: CurrentTraffic = CurrentTraffic(self.metrics_client, self.graph_client, topology_id,
cluster, environ, start, end, {}, **model_kwargs)
output = {}
for model_name in models:
LOG.info("Running topology performance model %s", model_name)
model = self.models[model_name]
try:
results: list = model.find_current_instance_waiting_times(topology_id=topology_id, cluster=cluster,
environ=environ,
traffic_source=traffic_provider,
start=start, end=end,
**model_kwargs)
except Exception as err:
LOG.error("Error running model: %s -> %s", model.name,
str(err))
errors.append({"model": model.name, "type": str(type(err)),
"error": str(err)})
else:
output[model_name] = json.dumps(results)
if errors:
return {"errors": errors}, 500
return output, 200
class HeronProposed(Resource):
""" Resource class for predicting a new packing plan for the topology, given its current or
future traffic. """
def __init__(self, model_classes: List[Type], model_config: Dict[str, Any], traffic_config: Dict[str, Any],
metrics_client: HeronMetricsClient,
graph_client: GremlinClient, tracker_url: str) -> None:
self.metrics_client: HeronMetricsClient = metrics_client
self.graph_client: GremlinClient = graph_client
self.tracker_url: str = tracker_url
self.model_config: Dict[str, Any] = model_config
self.traffic_config: Dict[str, Any] = traffic_config
self.models: Dict[str, HeronTopologyModel] = {}
for model_class in model_classes:
model = model_class(model_config, metrics_client, graph_client)
self.models[model.name] = model
self.CURRENT = "current"
self.FUTURE = "future"
super().__init__()
def get(self, topology_id: str, traffic_source: str):
""" Method handling get requests to the current topology packing plan
modelling endpoint."""
# Checking to make sure we have required arguments
errors: List[Dict[str, str]] = []
if "cluster" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'cluster' parameter should be supplied"})
if "environ" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'environ' parameter should be supplied"})
if "model" not in request.args:
errors.append({"type": "MissingParameter",
"error": ("At least one 'model' parameter should "
"be supplied. Supply 'all' to run all "
"configured models")})
# Return useful errors to the client if any parameters are missing
if errors:
return {"errors": errors}, 400
LOG.info("Processing performance modelling request for topology: %s, "
"cluster: %s, environment: %s, using model: %s", topology_id,
request.args.get("cluster"), request.args.get("environ"),
str(request.args.getlist("model")))
cluster = request.args.get("cluster")
environ = request.args.get("environ")
# Make sure we have a current graph representing the physical plan for
# the topology
graph_check(self.graph_client, self.model_config, self.tracker_url,
cluster, environ, topology_id)
# Make sure we have a file containing all paths for the job
paths_check(self.graph_client, self.model_config, cluster,
environ, topology_id)
if "all" in request.args.getlist("model"):
LOG.info("Running all configured Heron topology performance "
"models")
models = self.models.keys()
else:
models = request.args.getlist("model")
# Convert the request.args to a dict suitable for passing as **kwargs
model_kwargs: Dict[str, Any] = \
utils.convert_wimd_to_dict(request.args)
# Remove the models list from the kwargs as it is only needed by this method
model_kwargs.pop("model")
model_kwargs.pop("cluster")
model_kwargs.pop("environ")
start, end = get_start_end_times(**model_kwargs)
# traffic source can be one of two values -- current or future. If it is of a future value, we must first
# create an object that gathers together future traffic information. Otherwise, if it is current, then we
# simply propose a packing plan based on current information
if traffic_source == self.CURRENT:
traffic_provider: CurrentTraffic = CurrentTraffic(self.metrics_client, self.graph_client, topology_id,
cluster, environ, start, end, {}, **model_kwargs)
elif traffic_source == self.FUTURE:
# the predicted traffic variable is initialized by the future traffic. It contains functions to convert
# the predicted traffic into arrival rates
traffic_provider: PredictedTraffic = PredictedTraffic(self.metrics_client, self.graph_client,
topology_id, cluster, environ, start, end,
self.traffic_config, **model_kwargs)
else:
errors.append(
{"type": "ValueError", "error": (f"{traffic_source} is an incorrect URI. Please either specify"
f" future or current as possible values and provide parameters"
f" accordingly.")})
return errors, 400
model_kwargs["zk.time.offset"] = self.model_config["zk.time.offset"]
model_kwargs["heron.statemgr.root.path"] = self.model_config["heron.statemgr.root.path"]
model_kwargs["heron.statemgr.connection.string"] = self.model_config["heron.statemgr.connection.string"]
for model_name in models:
LOG.info("Running topology packing plan model %s", model_name)
model = self.models[model_name]
results: list = model.predict_packing_plan(topology_id=topology_id,
cluster=cluster,
environ=environ,
start=start,
end=end,
traffic_provider=traffic_provider,
**model_kwargs)
return results