in src/backend/entrypoints/llm_backend/tasks.py [0:0]
def run_optimization_chord(self, pipeline_start_time: int, studies_names: list[str]):
current_user = user.get()
session = session_manager_singleton.get(current_user.email, self.session_id)
try:
optimization_results = [
get_optimization_results(settings.optimization.storage_url, study_name) for study_name in studies_names
]
minimization_result = _get_optimization_result(StudyDirection.MINIMIZE, optimization_results)
maximization_result = _get_optimization_result(StudyDirection.MAXIMIZE, optimization_results)
session.flow_status.promote_optimization_step(
{
# TODO: safe only if running processes are within the same machine/pod, fine for one celery pod
"execution_time": time.time() - pipeline_start_time,
**OptimizationResponseModel(
minimization=minimization_result,
maximization=maximization_result,
sampler=session.get("optimization_sampler"),
target_func=session.get("optimization_target_func"),
).model_dump(),
}
)
session_manager_singleton.save(current_user.email, session)
payload = {"progress": PUBSUB_END_OF_DATA}
_safe_progress_publish(
pubsub=sync_redis_client,
channel=f"task-{self.request.id}-progress",
message=json.dumps(payload),
error_flag=False,
)
return FlowStatusResponseModel(
flow_status=session.flow_status.to_dict(),
).model_dump()
# TODO: handle only specific exceptions in prod
except (SoftTimeLimitExceeded, LLMBadResponseError, Exception) as e:
_handle_error(
session_id=session.session_id,
email=current_user.email,
step=Steps.OPTIMIZE,
exc_to_raise=e,
)