src/backend/domain/services/steps/optimization.py (143 lines of code) (raw):

import logging from copy import copy from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np import optuna import pandas as pd from optuna import Study from optuna.samplers import BaseSampler from optuna.storages import BaseStorage from optuna.study import StudyDirection from optuna.trial import FrozenTrial from market_alerts.containers import data_periodicities, data_providers from market_alerts.domain.services.steps import indicator_step, trading_step from market_alerts.domain.services.steps.utils import get_fx_rate, get_sparse_dividends logger = logging.getLogger(__name__) def save_new_keys_in_origin_session(session_dict_origin, session_dict): if "fx_rates" in session_dict: session_dict_origin.setdefault("fx_rates", dict()) new_keys = set(session_dict["fx_rates"].keys()) - set(session_dict_origin["fx_rates"].keys()) if new_keys: provider = data_providers[session_dict_origin["data_provider"]] for fx_rate_symbol in new_keys: left_curr, right_curr = fx_rate_symbol.split("/") if left_curr == right_curr: session_dict_origin["fx_rates"][fx_rate_symbol] = pd.DataFrame( data=1.0, columns=["open", "high", "low", "close"], index=session_dict_origin["time_line"] ) else: session_dict_origin["fx_rates"][fx_rate_symbol] = get_fx_rate( session_dict_origin["time_line"], fx_rate_symbol, session_dict_origin["start_date"], session_dict_origin["end_date"], data_periodicities[session_dict_origin["interval"]]["value"], provider, ) if "dividends_by_symbol" in session_dict: session_dict_origin.setdefault("dividends_by_symbol", dict()) new_keys = set(session_dict["dividends_by_symbol"].keys()) - set(session_dict_origin["dividends_by_symbol"].keys()) if new_keys: provider = data_providers[session_dict_origin["data_provider"]] for symbol in new_keys: session_dict_origin["dividends_by_symbol"][symbol] = get_sparse_dividends( time_line=session_dict_origin["time_line"], provider=provider, symbol=symbol, div_end_date=session_dict_origin["end_date"], div_start_date=session_dict_origin["start_date"], true_symbols=session_dict_origin["true_symbols"], ) def objective(trial, session, target_function, apply_dividends: bool, train_size: float, is_trades_stats_needed: bool) -> Any: # session_origin = session session = copy(session) session.data = copy(session.data) session["u_strs"] = session["u_strs"].copy() value_by_param = dict() for key, (_, type_) in session.flow_status.parsed_optimization_params.items(): if type_ == "int": value_by_param[key] = trial.suggest_int(key, session["range_by_param"][key][0], session["range_by_param"][key][1]) elif type_ == "float": value_by_param[key] = trial.suggest_float(key, session["range_by_param"][key][0], session["range_by_param"][key][1]) else: value_by_param[key] = trial.suggest_categorical(key, session["range_by_param"][key]) new_llm_response = """ ```python %s ``` ```python %s ``` """ % ( session.flow_status.get_interpolated_indicators_code_template(value_by_param), session.flow_status.trading_code, ) session["indicators_dialogue"][-1] = new_llm_response if train_size < 1.0: n_train = round(train_size * len(session["time_line"])) n_overlap = 250 session_train = session.get_slice(0, n_train) session_test = session.get_slice(max(0, n_train - n_overlap), len(session["time_line"])) logger.debug("Running in-sample...") indicator_step(session_train) for _ in trading_step(session_train, apply_dividends=apply_dividends, is_trades_stats_needed=is_trades_stats_needed): pass logger.debug("Running out-of-sample...") indicator_step(session_test) for _ in trading_step( session_test, apply_dividends=apply_dividends, start_idx=n_overlap, is_trades_stats_needed=is_trades_stats_needed ): pass res = target_function(session_test) if np.issubdtype(type(res), np.integer): res = int(res) trial.set_user_attr("test_value", res) return target_function(session_train) else: session = session.get_slice(0, len(session["time_line"])) indicator_step(session) for _ in trading_step(session, apply_dividends=apply_dividends, is_trades_stats_needed=is_trades_stats_needed): pass trial.set_user_attr("test_value", 0.0) return target_function(session) def optimize( session, target_function, storage: str | BaseStorage, sampler: BaseSampler, study_name: str, study_direction: Optional[StudyDirection] = None, study_load_if_exists: bool = False, trial_callbacks: Optional[list[Callable[[Study, FrozenTrial], None]]] = None, n_trials: int = 5, train_size: float = 1.0, apply_dividends: bool = False, is_trades_stats_needed: bool = True, ) -> None: study = optuna.create_study( storage=storage, sampler=sampler, study_name=study_name, direction=study_direction, load_if_exists=study_load_if_exists ) study.optimize( lambda trial: objective( trial, session, target_function, apply_dividends=apply_dividends, train_size=train_size, is_trades_stats_needed=is_trades_stats_needed, ), n_trials=n_trials, callbacks=trial_callbacks, ) def get_optimization_results( storage: Optional[str | BaseStorage], study_name: str, ) -> Tuple[Dict[str, Any], Study, List[Tuple[int, float, float, Dict[str, Any], float]]]: study = optuna.load_study(study_name=study_name, storage=storage) return ( study.best_params, study, sorted( [ (trial.number + 1, trial.value, trial.user_attrs["test_value"], trial.params, trial.duration.total_seconds()) for trial in study.get_trials() ], key=lambda x: x[0], ), ) def delete_optimization_study(storage: Optional[str | BaseStorage], study_name: str) -> None: optuna.delete_study(study_name=study_name, storage=storage) logger.info("Study %s deleted successfully", study_name)