in api/model/topology/heron.py [0:0]
def post(self, topology_id: str) -> Tuple[Dict[str, Any], int]:
""" Method handling POST requests to the current topology performance
modelling endpoint."""
# Make sure we have the args we need
errors: List[Dict[str, str]] = []
if "cluster" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'cluster' parameter should be supplied"})
if "environ" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'environ' parameter should be supplied"})
if "model" not in request.args:
errors.append({"type": "MissingParameter",
"error": ("At least one 'model' parameter should "
"be supplied. Supply 'all' to run all "
"configured models")})
# Return useful errors to the client if any parameters are missing
if errors:
return {"errors": errors}, 400
LOG.info("Processing performance modelling request for topology: %s, "
"cluster: %s, environment: %s, using model: %s",
topology_id, request.args.get("cluster"),
request.args.get("environ"),
str(request.args.getlist("model")))
# Make sure we have a current graph representing the physical plan for
# the topology
try:
graph_check(self.graph_client, self.model_config, self.tracker_url,
request.args["cluster"], request.args["environ"],
topology_id)
except Exception as err:
LOG.error("Error running graph check for topology: %s -> %s",
topology_id, str(err))
errors.append({"topology": topology_id,
"type": str(type(err)),
"error": str(err)})
return {"errors": errors}, 400
# Get the spout traffic state and convert the json string task ID to
# integers
json_traffic: Dict[str, Dict[str, float]] = request.get_json()
traffic: Dict[int, Dict[str, float]] = \
{int(key): value for key, value in json_traffic.items()}
if "all" in request.args.getlist("model"):
LOG.info("Running all configured Heron topology performance "
"models")
models = self.models.keys()
else:
models = request.args.getlist("model")
# Convert the request.args to a dict suitable for passing as **kwargs
model_kwargs: Dict[str, Any] = \
utils.convert_wimd_to_dict(request.args)
# Remove the models list + other keys from the kwargs as it is only
# needed by this method
model_kwargs.pop("model")
model_kwargs.pop("cluster")
model_kwargs.pop("environ")
cluster = request.args.get("cluster")
environ = request.args.get("environ")
output = {}
for model_name in models:
LOG.info("Running topology performance model %s", model_name)
model = self.models[model_name]
try:
results: pd.DataFrame = model.predict_current_performance(
topology_id=topology_id,
cluster=cluster,
environ=environ,
spout_traffic=traffic, **model_kwargs)
except Exception as err:
LOG.error("Error running model: %s -> %s", model.name,
str(err))
errors.append({"model": model.name, "type": str(type(err)),
"error": str(err)})
else:
output[model_name] = results.to_json()
if errors:
return {"errors": errors}, 500
return output, 200