api/model/traffic/heron.py (95 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains the API resources for the Apache Heron traffic modelling """ import logging from flask import request from flask_restful import Resource from typing import List, Dict, Type, Any, Tuple from caladrius.api import utils from caladrius.metrics.heron.client import HeronMetricsClient from caladrius.model.traffic.heron.base import HeronTrafficModel from caladrius.graph.gremlin.client import GremlinClient from caladrius.graph.utils.heron import graph_check LOG: logging.Logger = logging.getLogger(__name__) class HeronTrafficModels(Resource): """ Resource class for the traffic model information endpoint. """ def __init__(self, model_classes: List[Type]) -> None: self.models_info: List[Dict[str, Any]] = [] for model in model_classes: model_info: Dict[str, str] = {} model_info["name"] = model.name model_info["description"] = model.description self.models_info.append(model_info) def get(self) -> List[Dict[str, Any]]: """ Returns the configured traffic models as a list of model information dictionaries that contain "name" and "description" keys. """ return self.models_info class HeronTraffic(Resource): """ This resource handles requests for traffic modelling of specific topologies. """ def __init__(self, model_classes: List[Type], model_config: Dict[str, Any], metrics_client: HeronMetricsClient, graph_client: GremlinClient, tracker_url: str) -> None: self.metrics_client: HeronMetricsClient = metrics_client self.graph_client: GremlinClient = graph_client self.tracker_url: str = tracker_url self.model_config: Dict[str, Any] = model_config self.models: Dict[str, HeronTrafficModel] = {} for model_class in model_classes: self.models[model_class.name] = \ model_class(model_config, metrics_client, graph_client) super().__init__() def get(self, topology_id) -> Tuple[Dict[str, Any], int]: # Make sure we have the args we need errors: List[Dict[str, str]] = [] if "cluster" not in request.args: errors.append({"type": "MissingParameter", "error": "'cluster' parameter should be supplied"}) if "environ" not in request.args: errors.append({"type": "MissingParameter", "error": "'environ' parameter should be supplied"}) if "model" not in request.args: errors.append({"type": "MissingParameter", "error": ("At least one 'model' parameter should " "be supplied. Supply 'all' to run all " "configured models")}) # Return useful errors to the client if any parameters are missing if errors: return {"errors": errors}, 400 LOG.info("Traffic prediction requested for Heron topology: %s on " "cluster: %s in environment: %s", topology_id, request.args["cluster"], request.args["environ"]) # Make sure we have a current graph representing the physical plan for # the topology try: graph_check(self.graph_client, self.model_config, self.tracker_url, request.args["cluster"], request.args["environ"], topology_id) except Exception as err: LOG.error("Error running graph check for topology: %s -> %s", topology_id, str(err)) errors.append({"topology": topology_id, "type": str(type(err)), "error": str(err)}) return {"errors": errors}, 400 output: Dict[str, Any] = {} output["errors"] = {} output["results"] = {} if "all" in request.args.getlist("model"): LOG.info("Running all configured Heron traffic performance models") models = self.models.keys() else: models = request.args.getlist("model") # Convert the request.args to a dict suitable for passing as **kwargs model_kwargs: Dict[str, Any] = \ utils.convert_wimd_to_dict(request.args) # Remove the models list from the kwargs as it is only needed by this # method, same with topology_id, cluster and environ values model_kwargs.pop("model") model_kwargs.pop("cluster") model_kwargs.pop("environ") output = {} for model_name in models: LOG.info("Running traffic performance model %s", model_name) model: HeronTrafficModel = self.models[model_name] try: results: Dict[str, Any] = model.predict_traffic( topology_id=topology_id, cluster=request.args.get("cluster"), environ=request.args.get("environ"), **model_kwargs) except Exception as err: LOG.error("Error running model: %s -> %s", model.name, str(err)) errors.append({"model": model.name, "type": str(type(err)), "error": str(err)}) else: output[model_name] = results if errors: return {"errors": errors}, 500 return output, 200