in src/backend/entrypoints/llm_backend/api/session.py [0:0]
def get_action_history(session_id: str):
current_user = user.get()
session = session_manager_singleton.get(current_user.email, session_id)
actions = []
for task_id, timestamp in session.actions_history:
task = celery_app.AsyncResult(task_id)
args_str = ", ".join([str(arg) for arg in task.args])
kwargs_str = ", ".join([f'{k}="{v}"' if isinstance(v, str) else f"{k}={v}" for k, v in task.kwargs.items()])
all_args_str = ", ".join(filter(None, [args_str, kwargs_str]))
action = f"{task.name}({all_args_str})"
actions.append(
dict(
start_date=datetime.fromtimestamp(timestamp).strftime("%Y-%m-%dT%H:%M:%S.%f"),
end_date__=task.date_done,
action=action,
result=str(task.result) if isinstance(task.result, Exception) else task.result,
)
)
if task.traceback:
actions[-1].update(traceback=task.traceback)
return actions