in src/backend/entrypoints/llm_backend/tasks.py [0:0]
def run_after_optimization_chord(self, results, pipeline_start_time: int, account_for_dividends: bool):
current_user = user.get()
session = session_manager_singleton.get(current_user.email, self.session_id)
try:
list_trading_stats_by_symbol, list_strategy_stats = [], []
for result in results:
trading_stats_by_symbol, strategy_stats = pickle.loads(result)
list_trading_stats_by_symbol.append(trading_stats_by_symbol)
list_strategy_stats.append(strategy_stats)
trading_statistics = get_combined_trading_statistics(
session, list_trading_stats_by_symbol, list_strategy_stats, apply_dividends=account_for_dividends
)
session.update(trading_statistics)
session.flow_status.promote_backtesting_step(
{
# TODO: safe only if running processes are within the same machine/pod, fine for one celery pod
"execution_time": time.time()
- pipeline_start_time,
}
)
session_manager_singleton.save(current_user.email, session)
payload = {"progress": PUBSUB_END_OF_DATA}
_safe_progress_publish(
pubsub=sync_redis_client,
channel=f"task-{self.request.id}-progress",
message=json.dumps(payload),
error_flag=False,
)
return FlowStatusResponseModel(
flow_status=session.flow_status.to_dict(),
).model_dump()
# TODO: handle only specific exceptions in prod
except (SoftTimeLimitExceeded, LLMBadResponseError, Exception) as e:
_handle_error(
session_id=session.session_id,
email=current_user.email,
step=Steps.OPTIMIZE,
exc_to_raise=e,
)