in model/traffic/heron/prophet.py [0:0]
def build_component_models(
metric_client: HeronMetricsClient, tracker_url: str, topology_id: str,
cluster: str, environ: str, start: dt.datetime = None, end: dt.datetime = None,
spout_emits: Optional[pd.DataFrame]=None) -> DefaultDict[str, Dict[str, Prophet]]:
LOG.info("Creating traffic models for spout components of topology %s",
topology_id)
if start and end and spout_emits is None:
spout_emits = get_spout_emissions(metric_client, tracker_url,
topology_id, cluster, environ, start,
end)
elif spout_emits is None and ((not start and end) or (start and not end)):
err: str = ("Either start and end datetime instances or the spout "
"emits should be provided")
LOG.error(err)
raise RuntimeError(err)
spout_comp_emits: pd.DataFrame = \
(spout_emits.groupby(["component", "stream", "timestamp"])
.mean()["emit_count"].reset_index())
output: DefaultDict[str, Dict[str, Prophet]] = defaultdict(dict)
for (spout_comp, stream), data in spout_comp_emits.groupby(["component",
"stream"]):
LOG.info("Creating traffic model for spout %s stream %s", spout_comp,
stream)
df: pd.DataFrame = (data[["timestamp", "emit_count"]]
.rename(index=str, columns={"timestamp": "ds",
"emit_count": "y"}))
model: Prophet = Prophet()
model.fit(df)
output[spout_comp][stream] = model
return output