in model/traffic/heron/prophet.py [0:0]
def build_instance_models(
metric_client: HeronMetricsClient, tracker_url: str, topology_id: str,
cluster: str, environ: str, start: Optional[dt.datetime] = None,
end: Optional[dt.datetime] = None,
spout_emits: Optional[pd.DataFrame] = None) -> INSTANCE_MODELS:
if start and end and spout_emits is None:
spout_emits = get_spout_emissions(metric_client, tracker_url,
topology_id, cluster, environ, start,
end)
elif spout_emits is None and ((not start and end) or (start and not end)):
err: str = ("Both start and end datetimes should be provided if spout "
"emits DataFrame is not")
LOG.error(err)
raise RuntimeError(err)
elif spout_emits is None and not start and not end:
err = ("Either start and end datetimes or the spout emits should be "
"provided")
LOG.error(err)
raise RuntimeError(err)
else:
spout_emits = cast(pd.DataFrame, spout_emits)
spout_groups: pd.core.groupby.DataFrameGroupBy = \
(spout_emits[["component", "task", "stream", "timestamp",
"emit_count"]]
.groupby(["component", "task", "stream"]))
output: INSTANCE_MODELS = defaultdict(lambda: defaultdict(dict))
for (spout_comp, task, stream), data in spout_groups:
df: pd.DataFrame = (data[["timestamp", "emit_count"]]
.rename(index=str, columns={"timestamp": "ds",
"emit_count": "y"}))
model: Prophet = Prophet()
model.fit(df)
output[spout_comp][task][stream] = model
return output