in api/model/topology/heron.py [0:0]
def get(self, topology_id: str) -> Tuple[Dict[str, Any], int]:
""" Method handling requests for the currently running topology's end to
end latency"""
# Make sure we have the args we need
errors: List[Dict[str, str]] = []
if "cluster" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'cluster' parameter should be supplied"})
if "environ" not in request.args:
errors.append({"type": "MissingParameter",
"error": "'environ' parameter should be supplied"})
if "model" not in request.args:
errors.append({"type": "MissingParameter",
"error": ("At least one 'model' parameter should "
"be supplied. Supply 'all' to run all "
"configured models")})
# Return useful errors to the client if any parameters are missing
if errors:
return {"errors": errors}, 400
LOG.info("Processing performance modelling request for topology: %s, "
"cluster: %s, environment: %s, using model: %s", topology_id,
request.args.get("cluster"), request.args.get("environ"),
str(request.args.getlist("model")))
cluster = request.args.get("cluster")
environ = request.args.get("environ")
# Make sure we have a current graph representing the physical plan for
# the topology
graph_check(self.graph_client, self.model_config, self.tracker_url,
cluster, environ, topology_id)
# Make sure we have a file containing all paths for the job
paths_check(self.graph_client, self.model_config, cluster,
environ, topology_id)
if "all" in request.args.getlist("model"):
LOG.info("Running all configured Heron topology performance "
"models")
models = self.models.keys()
else:
models = request.args.getlist("model")
# Convert the request.args to a dict suitable for passing as **kwargs
model_kwargs: Dict[str, Any] = \
utils.convert_wimd_to_dict(request.args)
# Remove the models list from the kwargs as it is only needed by this
# method
model_kwargs.pop("model")
model_kwargs.pop("cluster")
model_kwargs.pop("environ")
model_kwargs["zk.time.offset"] = self.model_config["zk.time.offset"]
model_kwargs["heron.statemgr.root.path"] = self.model_config["heron.statemgr.root.path"]
model_kwargs["heron.statemgr.connection.string"] = self.model_config["heron.statemgr.connection.string"]
start, end = get_start_end_times(**model_kwargs)
traffic_provider: CurrentTraffic = CurrentTraffic(self.metrics_client, self.graph_client, topology_id,
cluster, environ, start, end, {}, **model_kwargs)
output = {}
for model_name in models:
LOG.info("Running topology performance model %s", model_name)
model = self.models[model_name]
try:
results: list = model.find_current_instance_waiting_times(topology_id=topology_id, cluster=cluster,
environ=environ,
traffic_source=traffic_provider,
start=start, end=end,
**model_kwargs)
except Exception as err:
LOG.error("Error running model: %s -> %s", model.name,
str(err))
errors.append({"model": model.name, "type": str(type(err)),
"error": str(err)})
else:
output[model_name] = json.dumps(results)
if errors:
return {"errors": errors}, 500
return output, 200