in api/model/topology/heron.py [0:0]
def get(self, topology_id: str, traffic_source: str):
""" Method handling get requests to the current topology packing plan
modelling endpoint."""
# Checking to make sure we have required arguments
errors: List[Dict[str, str]] = []
if "cluster" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'cluster' parameter should be supplied"})
if "environ" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'environ' parameter should be supplied"})
if "model" not in request.args:
errors.append({"type": "MissingParameter",
"error": ("At least one 'model' parameter should "
"be supplied. Supply 'all' to run all "
"configured models")})
# Return useful errors to the client if any parameters are missing
if errors:
return {"errors": errors}, 400
LOG.info("Processing performance modelling request for topology: %s, "
"cluster: %s, environment: %s, using model: %s", topology_id,
request.args.get("cluster"), request.args.get("environ"),
str(request.args.getlist("model")))
cluster = request.args.get("cluster")
environ = request.args.get("environ")
# Make sure we have a current graph representing the physical plan for
# the topology
graph_check(self.graph_client, self.model_config, self.tracker_url,
cluster, environ, topology_id)
# Make sure we have a file containing all paths for the job
paths_check(self.graph_client, self.model_config, cluster,
environ, topology_id)
if "all" in request.args.getlist("model"):
LOG.info("Running all configured Heron topology performance "
"models")
models = self.models.keys()
else:
models = request.args.getlist("model")
# Convert the request.args to a dict suitable for passing as **kwargs
model_kwargs: Dict[str, Any] = \
utils.convert_wimd_to_dict(request.args)
# Remove the models list from the kwargs as it is only needed by this method
model_kwargs.pop("model")
model_kwargs.pop("cluster")
model_kwargs.pop("environ")
start, end = get_start_end_times(**model_kwargs)
# traffic source can be one of two values -- current or future. If it is of a future value, we must first
# create an object that gathers together future traffic information. Otherwise, if it is current, then we
# simply propose a packing plan based on current information
if traffic_source == self.CURRENT:
traffic_provider: CurrentTraffic = CurrentTraffic(self.metrics_client, self.graph_client, topology_id,
cluster, environ, start, end, {}, **model_kwargs)
elif traffic_source == self.FUTURE:
# the predicted traffic variable is initialized by the future traffic. It contains functions to convert
# the predicted traffic into arrival rates
traffic_provider: PredictedTraffic = PredictedTraffic(self.metrics_client, self.graph_client,
topology_id, cluster, environ, start, end,
self.traffic_config, **model_kwargs)
else:
errors.append(
{"type": "ValueError", "error": (f"{traffic_source} is an incorrect URI. Please either specify"
f" future or current as possible values and provide parameters"
f" accordingly.")})
return errors, 400
model_kwargs["zk.time.offset"] = self.model_config["zk.time.offset"]
model_kwargs["heron.statemgr.root.path"] = self.model_config["heron.statemgr.root.path"]
model_kwargs["heron.statemgr.connection.string"] = self.model_config["heron.statemgr.connection.string"]
for model_name in models:
LOG.info("Running topology packing plan model %s", model_name)
model = self.models[model_name]
results: list = model.predict_packing_plan(topology_id=topology_id,
cluster=cluster,
environ=environ,
start=start,
end=end,
traffic_provider=traffic_provider,
**model_kwargs)
return results