src/backend/domain/services/steps/calculate_trading.py (835 lines of code) (raw):

import inspect import types from typing import Any, Dict, Generator import numpy as np import pandas as pd from market_alerts.containers import ( data_periodicities, data_providers, trade_fill_prices, ) from market_alerts.domain.constants import DATA_PROVIDER_FX_RATES from market_alerts.domain.default_list import DefaultList from market_alerts.domain.services.steps.risk_limit import * from market_alerts.domain.services.steps.utils import ( get_fx_rate, get_globals, get_print_redirect, get_sparse_dividends, unify_indexes, ) from market_alerts.infrastructure.services.code import ( compile_code, exec_code, get_code_sections, ) def get_trade_stats( trading_stats_by_symbol, tradable_symbols, list_trade_fill_price_by_symbol, fx_rates, execution_cost_bps, apply_dividends, cum_dividends_by_symbol, cum_acct_ccy_dividends_by_symbol, ): entry_price = {key: {} for key in tradable_symbols} exit_price = {key: {} for key in tradable_symbols} current_position = {key: [] for key in tradable_symbols} for symbol in tradable_symbols: symbols_time_line = trading_stats_by_symbol[symbol].index array_buy_alert = trading_stats_by_symbol[symbol]["buy_alert"].values array_sell_alert = trading_stats_by_symbol[symbol]["sell_alert"].values array_quantity = trading_stats_by_symbol[symbol]["quantity"].values if apply_dividends: if symbol in cum_dividends_by_symbol: cur_symbol_cum_dividends = cum_dividends_by_symbol[symbol].values cur_symbol_cum_acct_ccy_dividends = cum_acct_ccy_dividends_by_symbol[symbol].values else: cur_symbol_cum_dividends = None cur_symbol_cum_acct_ccy_dividends = None for idx, (t, buy_alert, sell_alert, quantity, price, fx_rate) in enumerate( zip( symbols_time_line, array_buy_alert, array_sell_alert, array_quantity, list_trade_fill_price_by_symbol[symbol], fx_rates[symbol], ) ): if buy_alert or sell_alert: if apply_dividends: if cur_symbol_cum_dividends is not None: cum_dividend = cur_symbol_cum_dividends[max(idx - 1, 0)] acct_ccy_cum_dividend = cur_symbol_cum_acct_ccy_dividends[max(idx - 1, 0)] else: cum_dividend = 0.0 acct_ccy_cum_dividend = 0.0 if execution_cost_bps is not None: price = ( (price + abs(price) * execution_cost_bps / 10000) if buy_alert else ((price - abs(price) * execution_cost_bps / 10000) if sell_alert else 0.0) ) qty = quantity if not current_position[symbol] or np.sign(current_position[symbol][0]["size"]) == np.sign(qty): current_position[symbol].append({"size": qty, "price": price, "date": t}) entry_price[symbol][t] = {"size": qty, "price": price, "acct_ccy_price": price / fx_rate} if apply_dividends: entry_price[symbol][t]["cum_dividend"] = cum_dividend entry_price[symbol][t]["acct_ccy_cum_dividend"] = acct_ccy_cum_dividend exit_price[symbol][t] = [] else: while abs(qty) > 0 and current_position[symbol]: if abs(qty) >= abs(current_position[symbol][0]["size"]): exit_price[symbol][current_position[symbol][0]["date"]].append( { "size": current_position[symbol][0]["size"], "price": price, "date": t, "acct_ccy_price": price / fx_rate, } ) if apply_dividends: exit_price[symbol][current_position[symbol][0]["date"]][-1]["dividends"] = ( cum_dividend - entry_price[symbol][current_position[symbol][0]["date"]]["cum_dividend"] ) * current_position[symbol][0]["size"] exit_price[symbol][current_position[symbol][0]["date"]][-1]["acct_ccy_dividends"] = ( acct_ccy_cum_dividend - entry_price[symbol][current_position[symbol][0]["date"]]["acct_ccy_cum_dividend"] ) * current_position[symbol][0]["size"] qty += current_position[symbol][0]["size"] del current_position[symbol][0] else: exit_price[symbol][current_position[symbol][0]["date"]].append( {"size": -qty, "price": price, "date": t, "acct_ccy_price": price / fx_rate} ) if apply_dividends: exit_price[symbol][current_position[symbol][0]["date"]][-1]["dividends"] = ( cum_dividend - entry_price[symbol][current_position[symbol][0]["date"]]["cum_dividend"] ) * (-qty) exit_price[symbol][current_position[symbol][0]["date"]][-1]["acct_ccy_dividends"] = ( acct_ccy_cum_dividend - entry_price[symbol][current_position[symbol][0]["date"]]["acct_ccy_cum_dividend"] ) * (-qty) current_position[symbol][0]["size"] += qty qty = 0 if abs(qty) > 0: current_position[symbol].append({"size": qty, "price": price, "date": t}) entry_price[symbol][t] = {"size": qty, "price": price, "acct_ccy_price": price / fx_rate} if apply_dividends: entry_price[symbol][t]["cum_dividend"] = cum_dividend entry_price[symbol][t]["acct_ccy_cum_dividend"] = acct_ccy_cum_dividend exit_price[symbol][t] = [] while current_position[symbol]: exit_price[symbol][current_position[symbol][0]["date"]].append( {"size": current_position[symbol][0]["size"], "price": price, "date": t, "acct_ccy_price": price / fx_rate} ) if apply_dividends: exit_price[symbol][current_position[symbol][0]["date"]][-1]["dividends"] = ( cum_dividend - entry_price[symbol][current_position[symbol][0]["date"]]["cum_dividend"] ) * current_position[symbol][0]["size"] exit_price[symbol][current_position[symbol][0]["date"]][-1]["acct_ccy_dividends"] = ( acct_ccy_cum_dividend - entry_price[symbol][current_position[symbol][0]["date"]]["acct_ccy_cum_dividend"] ) * current_position[symbol][0]["size"] del current_position[symbol][0] trades_by_symbol = dict() for symbol in tradable_symbols: trades = [] for entry_t, exits in exit_price[symbol].items(): for exit in exits: size = abs(exit["size"]) type_ = exit["size"] > 0 entry_p = entry_price[symbol][entry_t]["price"] entry_acct_ccy_p = entry_price[symbol][entry_t]["acct_ccy_price"] exit_p = exit["price"] exit_acct_ccy_p = exit["acct_ccy_price"] exit_t = exit["date"] net_profit = (exit_p - entry_p) * exit["size"] net_profit_fund = (exit_acct_ccy_p - entry_acct_ccy_p) * exit["size"] if apply_dividends: net_profit += exit["dividends"] net_profit_fund += exit["acct_ccy_dividends"] is_it_win = net_profit_fund > 0 time_in_trade = (exit_t - entry_t).days trades.append( [ type_, size, entry_t, entry_p, entry_acct_ccy_p, exit_t, exit_p, exit_acct_ccy_p, net_profit, is_it_win, net_profit_fund, time_in_trade, ] ) trades_by_symbol[symbol] = pd.DataFrame( trades, columns=[ "long", "size", "entry time", "entry price", "entry price acct ccy", "exit time", "exit price", "exit price acct ccy", "net profit", "win", "net profit acct ccy", "days in trade", ], ) def make_trade_stats(current_trades): trade_stats = dict() trade_stats["# Trades"] = current_trades.shape[0] trade_stats["Winning Trades"] = current_trades["win"].sum() trade_stats["Losing Trades"] = (~current_trades["win"]).sum() trade_stats["Win Ratio"] = current_trades["win"].mean() trade_stats["Avg. Trade"] = round(current_trades["net profit acct ccy"].mean(), 2) if current_trades["win"].any(): trade_stats["Avg. Winning Trade"] = round(current_trades["net profit acct ccy"][current_trades["win"]].mean(), 2) else: trade_stats["Avg. Winning Trade"] = 0.0 if (~current_trades["win"]).any(): trade_stats["Avg. Losing Trade"] = round(current_trades["net profit acct ccy"][~current_trades["win"]].mean(), 2) else: trade_stats["Avg. Losing Trade"] = 0.0 if abs(trade_stats["Avg. Losing Trade"]) >= 1e-20: trade_stats["Payoff Ratio"] = trade_stats["Avg. Winning Trade"] / abs(trade_stats["Avg. Losing Trade"]) else: trade_stats["Payoff Ratio"] = float("nan") if current_trades["size"].sum() >= 1e-20: trade_stats["Profit Margin (Bps)"] = round( current_trades["net profit acct ccy"].sum() / current_trades["size"].sum(), 2 ) else: trade_stats["Profit Margin (Bps)"] = float("nan") sum_entry_price_acct_ccy_mul_size = (current_trades["entry price acct ccy"] * current_trades["size"]).sum() if abs(sum_entry_price_acct_ccy_mul_size) >= 1e-20: trade_stats["Profit Margin (Bps)"] = round( current_trades["net profit acct ccy"].sum() / sum_entry_price_acct_ccy_mul_size * 10000, 2 ) else: trade_stats["Profit Margin (Bps)"] = float("nan") trade_stats["Avg. Holding Days"] = round(current_trades["days in trade"].mean(), 2) return trade_stats trade_stats_by_symbol = dict() for symbol in tradable_symbols: current_trades = trades_by_symbol[symbol] trade_stats_by_symbol[symbol] = make_trade_stats(current_trades) long_trade_stats_by_symbol = dict() for symbol in tradable_symbols: current_trades = trades_by_symbol[symbol].loc[trades_by_symbol[symbol]["long"]] long_trade_stats_by_symbol[symbol] = make_trade_stats(current_trades) short_trade_stats_by_symbol = dict() for symbol in tradable_symbols: current_trades = trades_by_symbol[symbol].loc[~trades_by_symbol[symbol]["long"]] short_trade_stats_by_symbol[symbol] = make_trade_stats(current_trades) list_trades = [] for symbol in tradable_symbols: trades = trades_by_symbol[symbol].copy() if trades.shape[0] > 0: trades["symbol"] = symbol list_trades.append(trades) if list_trades: all_trades = pd.concat(list_trades).sort_values("exit time").reset_index(drop=True) else: all_trades = pd.DataFrame( [], columns=[ "long", "size", "entry time", "entry price", "entry price acct ccy", "exit time", "exit price", "exit price acct ccy", "net profit", "win", "net profit acct ccy", "days in trade", ], ) global_trade_stats = make_trade_stats(all_trades) long_global_trade_stats = make_trade_stats(all_trades.loc[all_trades["long"]]) short_global_trade_stats = make_trade_stats(all_trades.loc[~all_trades["long"]]) return ( trades_by_symbol, trade_stats_by_symbol, long_trade_stats_by_symbol, short_trade_stats_by_symbol, all_trades, global_trade_stats, long_global_trade_stats, short_global_trade_stats, ) def get_actual_currency_fx_rates(session_dict, actual_currency: str) -> dict[str, Any]: time_line = session_dict["time_line"] provider = data_providers[session_dict["data_provider"]] fx_rate_template = DATA_PROVIDER_FX_RATES[session_dict["data_provider"]] symbol_to_currency = session_dict["symbol_to_currency"] for s in set(symbol_to_currency.values()): if "/" not in s: fx_rate_symbol = fx_rate_template % (actual_currency, s) else: left_curr, right_curr = s.split("/") fx_rate_symbol = fx_rate_template % (left_curr, right_curr) if not fx_rate_symbol in session_dict["fx_rates"]: if s == actual_currency: session_dict["fx_rates"][fx_rate_symbol] = pd.DataFrame( data=1.0, columns=["open", "high", "low", "close"], index=time_line ) else: session_dict["fx_rates"][fx_rate_symbol] = get_fx_rate( time_line, fx_rate_symbol, session_dict["start_date"], session_dict["end_date"], data_periodicities[session_dict["interval"]]["value"], provider, ) return unify_indexes(session_dict["fx_rates"], time_line) def get_sparse_dividends_for_each_tradable_symbol(session_dict) -> dict[str, Any]: time_line = session_dict["time_line"] provider = data_providers[session_dict["data_provider"]] dividends_by_symbol = session_dict.setdefault("dividends_by_symbol", dict()) new_time_period = session_dict["time_period"] dividends_time_period = session_dict.setdefault("dividends_time_period", new_time_period) for symbol in session_dict["tradable_symbols"]: if symbol not in dividends_by_symbol or new_time_period > dividends_time_period: dividends_by_symbol[symbol] = get_sparse_dividends( time_line=time_line, provider=provider, symbol=symbol, div_end_date=session_dict["end_date"], div_start_date=session_dict["start_date"], true_symbols=session_dict["true_symbols"], ) session_dict["dividends_time_period"] = new_time_period return dividends_by_symbol def get_global_strategy_stats( trading_stats_by_symbol: dict[str, pd.DataFrame], strategy_stats: pd.DataFrame, tradable_symbols: list[str], time_line: pd.DatetimeIndex, ) -> dict[str, Any]: pos_cost = sum( [ (trading_stats_by_symbol[s]["acct_ccy_cost"] * (trading_stats_by_symbol[s]["acct_ccy_cost"] > 0)).fillna(0) for s in tradable_symbols ] ) neg_cost = sum( [ (trading_stats_by_symbol[s]["acct_ccy_cost"].abs() * (trading_stats_by_symbol[s]["acct_ccy_cost"] < 0)).fillna(0) for s in tradable_symbols ] ) aprox_n_trading_days = 365 * time_line.shape[0] / (time_line[-1] - time_line[0]).days n_trading_days_per_year = 250 if abs(250 - aprox_n_trading_days) < abs(365 - aprox_n_trading_days) else 365 global_strategy_stats = dict() global_strategy_stats["Gross Profit/Loss"] = round(strategy_stats["acct_ccy_pnl"][-1], 2) global_strategy_stats["Turnover BOT"] = round(neg_cost.sum(), 2) global_strategy_stats["Turnover SOLD"] = round(pos_cost.sum(), 2) drawdowns = [[0, 0, 0, 0]] for i, stg_pnl in enumerate(strategy_stats["acct_ccy_pnl"].values): if stg_pnl > drawdowns[-1][0]: drawdowns.append([stg_pnl, stg_pnl, i, i]) else: drawdowns[-1][3] += 1 drawdowns[-1][1] = drawdowns[-1][1] if drawdowns[-1][1] < stg_pnl else stg_pnl max_drawdown = max(drawdowns, key=lambda x: x[0] - x[1]) max_drawdown_dur = max(drawdowns, key=lambda x: x[3] - x[2]) global_strategy_stats["Max DD"] = round(max_drawdown[0] - max_drawdown[1], 2) if abs(global_strategy_stats["Max DD"]) < 1e-20: global_strategy_stats["Return/DD Ratio"] = float("nan") else: global_strategy_stats["Return/DD Ratio"] = global_strategy_stats["Gross Profit/Loss"] / global_strategy_stats["Max DD"] global_strategy_stats["Max DD duration"] = max_drawdown_dur[3] - max_drawdown_dur[2] global_strategy_stats["Annual Return"] = round(strategy_stats["acct_ccy_day_pnl"].mean() * n_trading_days_per_year, 2) global_strategy_stats["Annualized Volatility"] = strategy_stats["acct_ccy_day_pnl"].std() * np.sqrt(n_trading_days_per_year) if abs(global_strategy_stats["Annualized Volatility"]) < 1e-20: global_strategy_stats["Information Ratio"] = float("nan") else: global_strategy_stats["Information Ratio"] = ( global_strategy_stats["Annual Return"] / global_strategy_stats["Annualized Volatility"] ) return global_strategy_stats def trading_step( session_dict: Dict[str, Any], trading_handler=None, risk_rules=None, apply_dividends=False, reverse_signal=False, start_idx=0, end_idx=None, is_trades_stats_needed=True, ) -> Generator[int, None, None]: if trading_handler is not None: trading_code = inspect.getsource(trading_handler) llm_response = f"""```python ``` ```python {trading_code} ```""" else: llm_response = session_dict["indicators_dialogue"][-1] _, trading_code = get_code_sections(llm_response) session_dict["trading_code"] = trading_code actual_currency = session_dict["actual_currency"] if session_dict["actual_currency"] else "USD" time_line = session_dict["time_line"] fx_rates = get_actual_currency_fx_rates(session_dict, actual_currency) if apply_dividends: dividends_by_symbol = get_sparse_dividends_for_each_tradable_symbol(session_dict) tradable_symbols = session_dict["tradable_symbols"] bet_size = session_dict["bet_size"] total_gross_limit = session_dict["total_gross_limit"] nop_limit = session_dict["nop_limit"] per_instrument_gross_limit = session_dict["per_instrument_gross_limit"] fx_rate_template = DATA_PROVIDER_FX_RATES[session_dict["data_provider"]] symbol_to_currency = session_dict["symbol_to_currency"] fx_rate_symbol_by_symbol = { s: fx_rate_template % (actual_currency, symbol_to_currency[s]) if "/" not in symbol_to_currency[s] else fx_rate_template % (symbol_to_currency[s].split("/")[0], symbol_to_currency[s].split("/")[1]) for s in tradable_symbols } trading_stats_by_symbol = {s: dict() for s in tradable_symbols} risk_processor = None if risk_rules is not None: risk_processor = RiskRuleProcessor( betSize=bet_size, grossLimit=total_gross_limit, nopLimit=nop_limit, instrumentGrossLimit=per_instrument_gross_limit ) risk_processor.addRiskRules(risk_rules) strategy_pnl_stats = {} strategy_pnl_stats["acct_ccy_pnl"] = DefaultList([], 0.0) ################################################################################################ apply_execution_cost = session_dict.get("execution_cost_bps", 0.0) != 0.0 ################################################################################################ default_value_by_property = { "quantity": 0.0, "quote_ccy_cost": 0.0, "acct_ccy_cost": 0.0, "quote_ccy_total_cost": 0.0, "acct_ccy_total_cost": 0.0, "total_size": 0.0, "quote_ccy_value": 0.0, "acct_ccy_value": 0.0, "quote_ccy_pnl": 0.0, "quote_ccy_day_pnl": 0.0, "acct_ccy_pnl": 0.0, "acct_ccy_day_pnl": 0.0, "buy_alert": False, "sell_alert": False, } if end_idx is None: end_idx = len(time_line) if session_dict["fill_trade_price"] == "day_close": check_len = end_idx check_range = range(start_idx, end_idx) trade_range = range(start_idx, end_idx) else: check_len = end_idx - 1 check_range = range(start_idx, end_idx - 1) trade_range = range(start_idx + 1, end_idx) if apply_dividends: default_value_by_property["cum_dividends"] = 0.0 default_value_by_property["acct_ccy_cum_dividends"] = 0.0 for s in tradable_symbols: for p, v in default_value_by_property.items(): # trading_stats_by_symbol[s][p] = DefaultList([v], v) trading_stats_by_symbol[s][p] = np.ones(check_len + 1, dtype=type(v)) * v strategy_stats = dict() default_value_by_strategy_property = { "acct_ccy_total_cost": 0.0, "acct_ccy_value": 0.0, "gross_value": 0.0, "acct_ccy_pnl": 0.0, "acct_ccy_day_pnl": 0.0, } for p, v in default_value_by_strategy_property.items(): # strategy_stats[p] = DefaultList([v], v) strategy_stats[p] = np.ones(check_len + 1, dtype=type(v)) * v array_close_price_by_symbol = {key: session_dict["data_by_symbol"][key]["close"].to_numpy() for key in tradable_symbols} array_open_price_by_symbol = {key: session_dict["data_by_symbol"][key]["open"].to_numpy() for key in tradable_symbols} array_fx_rates = {key: value["close"].to_numpy() for key, value in fx_rates.items()} if apply_dividends: array_dividends_by_symbol = {key: value.to_numpy() for key, value in dividends_by_symbol.items()} ################################################################################################ ## ## ## need to rewrite this ## ## ## ################################################################################################ list_trade_fill_price_by_symbol = trade_fill_prices.get_trade_price_by_backend_key( session_dict["fill_trade_price"] ).get_list_trade_fill_price_by_symbol({key: session_dict["data_by_symbol"][key] for key in tradable_symbols}) ################################################################################################ comp_trading_code = compile_code(trading_code) def calculate_properties(apply_execution_cost: bool = False): total_size = quantity + trading_stats_by_symbol[s]["total_size"][check_idx] quote_ccy_value = total_size * array_close_price_by_symbol[s][idx] acct_ccy_value = quote_ccy_value / array_fx_rates[fx_rate_symbol_by_symbol[s]][idx] buy_alert = quantity > 1e-10 sell_alert = quantity < -1e-10 if apply_execution_cost: base_price = list_trade_fill_price_by_symbol[s][idx] execution_cost = ( (base_price + abs(base_price) * session_dict["execution_cost_bps"] / 10000) if buy_alert else ((base_price - abs(base_price) * session_dict["execution_cost_bps"] / 10000) if sell_alert else 0.0) ) quote_ccy_cost = -quantity * execution_cost else: quote_ccy_cost = -quantity * list_trade_fill_price_by_symbol[s][idx] acct_ccy_cost = quote_ccy_cost / array_fx_rates[fx_rate_symbol_by_symbol[s]][idx] quote_ccy_total_cost = quote_ccy_cost + trading_stats_by_symbol[s]["quote_ccy_total_cost"][check_idx] acct_ccy_total_cost = acct_ccy_cost + trading_stats_by_symbol[s]["acct_ccy_total_cost"][check_idx] quote_ccy_pnl = quote_ccy_total_cost + quote_ccy_value if apply_dividends: cur_dividends = total_size * array_dividends_by_symbol[s][idx] if s in array_dividends_by_symbol else 0.0 cum_dividends = trading_stats_by_symbol[s]["cum_dividends"][check_idx] + cur_dividends quote_ccy_pnl += cum_dividends quote_ccy_day_pnl = quote_ccy_pnl - trading_stats_by_symbol[s]["quote_ccy_pnl"][check_idx] acct_ccy_pnl = acct_ccy_total_cost + acct_ccy_value if apply_dividends: acct_ccy_cum_dividends = ( trading_stats_by_symbol[s]["acct_ccy_cum_dividends"][check_idx] + cur_dividends / array_fx_rates[fx_rate_symbol_by_symbol[s]][idx] ) acct_ccy_pnl += acct_ccy_cum_dividends acct_ccy_day_pnl = acct_ccy_pnl - trading_stats_by_symbol[s]["acct_ccy_pnl"][check_idx] properties = { "quote_ccy_cost": quote_ccy_cost, "acct_ccy_cost": acct_ccy_cost, "quote_ccy_total_cost": quote_ccy_total_cost, "acct_ccy_total_cost": acct_ccy_total_cost, "total_size": total_size, "quote_ccy_value": quote_ccy_value, "acct_ccy_value": acct_ccy_value, "quote_ccy_pnl": quote_ccy_pnl, "quote_ccy_day_pnl": quote_ccy_day_pnl, "acct_ccy_pnl": acct_ccy_pnl, "acct_ccy_day_pnl": acct_ccy_day_pnl, "buy_alert": buy_alert, "sell_alert": sell_alert, } if apply_dividends: properties["cum_dividends"] = cum_dividends properties["acct_ccy_cum_dividends"] = acct_ccy_cum_dividends return properties def calculate_strategy_properties(): properties = { "acct_ccy_total_cost": sum( [trading_stats_by_symbol[s]["acct_ccy_total_cost"][check_idx + 1] for s in tradable_symbols] ), "acct_ccy_value": sum([trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1] for s in tradable_symbols]), "gross_value": sum([np.abs(trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1]) for s in tradable_symbols]), "acct_ccy_pnl": sum([trading_stats_by_symbol[s]["acct_ccy_pnl"][check_idx + 1] for s in tradable_symbols]), "acct_ccy_day_pnl": sum([trading_stats_by_symbol[s]["acct_ccy_day_pnl"][check_idx + 1] for s in tradable_symbols]), } return properties cur_sum_value = 0.0 pos_value = 0.0 neg_value = 0.0 lclsglbls = session_dict.get("lclsglbls", get_globals()) if lclsglbls.get("data_by_symbol", None) is None: for key in session_dict["data_by_symbol"]: lclsglbls[key] = session_dict["data_by_symbol"][key] for key in session_dict["data_by_synth"]: lclsglbls[key] = session_dict["data_by_synth"][key] lclsglbls["data_by_symbol"] = session_dict["data_by_symbol"] if lclsglbls.get("supplementary_symbols", None) is None: lclsglbls["supplementary_symbols"] = session_dict["supplementary_symbols"] if lclsglbls.get("tradable_symbols", None) is None: lclsglbls["tradable_symbols"] = session_dict["tradable_symbols"] # trading_info = { # key: {k: DefaultList([], trading_stats_by_symbol[key][k][0]) for k in trading_stats_by_symbol[key]} # for key in trading_stats_by_symbol # } fx_rates_info = dict() for key in tradable_symbols: fx_rates_info[key] = fx_rates[fx_rate_symbol_by_symbol[key]]["close"] lclsglbls["fx_rates"] = fx_rates_info lclsglbls["time_line"] = time_line lclsglbls["trading_info"] = trading_stats_by_symbol lclsglbls["strategy_info"] = strategy_stats lclsglbls["bet_size"] = bet_size # do not remove; required for backward compatibility of old models all_iterations_num = end_idx - 1 prev_percent, current_percent = 0, 0 if trading_handler is None: lclsglbls["print"], session_dict["trading_code_log"] = get_print_redirect() for check_idx, idx in zip(check_range, trade_range): current_percent = min(idx * 100 // all_iterations_num, 99) if current_percent != prev_percent: yield current_percent # set limits according to risk rule settings if risk_processor is not None: # bet_size = risk_processor.BetSize total_gross_limit = risk_processor.GrossLimit nop_limit = risk_processor.NOPLimit # per_instrument_gross_limit_by_symbol # for key in trading_stats_by_symbol: # for k in trading_stats_by_symbol[key]: # trading_info[key][k].append(trading_stats_by_symbol[key][k][-1]) lclsglbls["order"] = {s: {"size": 0, "unit_of_measure": "base units"} for s in tradable_symbols} lclsglbls["idx"] = check_idx lclsglbls["bet_size"] = bet_size if risk_processor is not None: strategy_pnl_stats["acct_ccy_pnl"].append( sum([trading_stats_by_symbol[s]["acct_ccy_pnl"][-1] for s in tradable_symbols]) ) risk_processor.applyRiskRules(trading_stats_by_symbol, strategy_pnl_stats, time_line[idx], time_line) # set limits according to risk rules if risk_processor is not None: nop_limit = risk_processor.NOPLimit total_gross_limit = risk_processor.GrossLimit lclsglbls["default_bet_size_by_symbol"] = { s: min(risk_processor.getSymbolDetails(s).BetSize, risk_processor.BetSize) for s in tradable_symbols } per_instrument_gross_limit_by_symbol = { s: min(risk_processor.getSymbolDetails(s).InstrumentGrossLimit, risk_processor.InstrumentGrossLimit) for s in tradable_symbols } else: lclsglbls["default_bet_size_by_symbol"] = {s: bet_size for s in tradable_symbols} per_instrument_gross_limit_by_symbol = {s: per_instrument_gross_limit for s in tradable_symbols} if trading_handler is not None: handler_func_with_updated_globals = types.FunctionType(trading_handler.__code__, lclsglbls, trading_handler.__name__) lclsglbls.update(handler_func_with_updated_globals()) else: exec_code(trading_code, lclsglbls, lclsglbls, comp_trading_code) del lclsglbls["__builtins__"] prev_percent = current_percent for s in tradable_symbols: force_close_positon = False suspend_trading = risk_processor is not None and risk_processor.isTradingSuspended( s ) # do not trade is suspended by risk rule # close position by stop loss if (suspend_trading and trading_stats_by_symbol[s]["acct_ccy_value"][-1] != 0) or idx == end_idx - 1: lclsglbls["order"][s] = { "size": -trading_stats_by_symbol[s]["total_size"][check_idx], "unit_of_measure": "base units", } force_close_positon = True if pd.isna(lclsglbls["order"][s]["size"]): lclsglbls["order"][s]["size"] = 0 if lclsglbls["order"][s]["unit_of_measure"] == "base units": quantity = np.round(lclsglbls["order"][s]["size"]) else: quantity = np.round( (lclsglbls["order"][s]["size"]) / (array_close_price_by_symbol[s][check_idx] / array_fx_rates[fx_rate_symbol_by_symbol[s]][check_idx]) ) if reverse_signal: quantity = -quantity properties = calculate_properties(apply_execution_cost) for key, value in properties.items(): if np.isnan(value): quantity = 0.0 properties = {k: default_value_by_property[k] for k, v in properties.items()} break is_abs_total_size_decreases = np.abs(properties["total_size"]) < np.abs( trading_stats_by_symbol[s]["total_size"][check_idx] ) cur_sum_value = cur_sum_value - np.abs(trading_stats_by_symbol[s]["acct_ccy_value"][check_idx]) pos_value = pos_value - trading_stats_by_symbol[s]["acct_ccy_value"][check_idx] * ( trading_stats_by_symbol[s]["acct_ccy_value"][check_idx] > 0 ) neg_value = neg_value - np.abs(trading_stats_by_symbol[s]["acct_ccy_value"][check_idx]) * ( trading_stats_by_symbol[s]["acct_ccy_value"][check_idx] < 0 ) NOP_pos_value = pos_value + (properties["acct_ccy_value"] if properties["acct_ccy_value"] > 0 else 0) NOP_neg_value = neg_value + (-properties["acct_ccy_value"] if properties["acct_ccy_value"] < 0 else 0) NOP_value = np.maximum(NOP_pos_value, NOP_neg_value) is_total_gross_limit_met = (cur_sum_value + np.abs(properties["acct_ccy_value"])) > total_gross_limit is_nop_limit_met = NOP_value > nop_limit # is_pos_limit_met = NOP_pos_value > pos_limit # is_neg_limit_met = NOP_neg_value > neg_limit is_per_instrument_gross_limit_met = np.abs(properties["acct_ccy_value"]) > per_instrument_gross_limit_by_symbol[s] if (suspend_trading and not force_close_positon) or ( not is_abs_total_size_decreases and ( is_total_gross_limit_met or is_nop_limit_met or is_per_instrument_gross_limit_met # or is_pos_limit_met # or is_neg_limit_met ) ): quantity = 0.0 properties = calculate_properties(apply_execution_cost) for key, value in properties.items(): if np.isnan(value): quantity = 0.0 properties = {k: default_value_by_property[k] for k, v in properties.items()} break trading_stats_by_symbol[s]["quantity"][check_idx + 1] = quantity for key, value in properties.items(): trading_stats_by_symbol[s][key][check_idx + 1] = value cur_sum_value = cur_sum_value + np.abs(trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1]) pos_value = pos_value + trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1] * ( trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1] > 0 ) neg_value = neg_value + np.abs(trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1]) * ( trading_stats_by_symbol[s]["acct_ccy_value"][check_idx + 1] < 0 ) strategy_properties = calculate_strategy_properties() for key, value in strategy_properties.items(): strategy_stats[key][check_idx + 1] = value if session_dict["fill_trade_price"] == "day_close": trading_stats_by_symbol = { s: {key: trading_stats_by_symbol[s][key][1:] for key in trading_stats_by_symbol[s]} for s in trading_stats_by_symbol } strategy_stats = {key: strategy_stats[key][1:] for key in strategy_stats} trading_stats_by_symbol = { s: pd.DataFrame(trading_stats_by_symbol[s], index=time_line[:end_idx]).iloc[start_idx:] for s in trading_stats_by_symbol } session_dict["long_alert"] = {s: trading_stats_by_symbol[s]["buy_alert"] for s in trading_stats_by_symbol} session_dict["short_alert"] = {s: trading_stats_by_symbol[s]["sell_alert"] for s in trading_stats_by_symbol} strategy_stats = pd.DataFrame(strategy_stats, index=time_line[:end_idx]).iloc[start_idx:] pos_value = sum( [ (trading_stats_by_symbol[s]["acct_ccy_value"] * (trading_stats_by_symbol[s]["acct_ccy_value"] > 0)).fillna(0) for s in tradable_symbols ] ) neg_value = sum( [ (trading_stats_by_symbol[s]["acct_ccy_value"].abs() * (trading_stats_by_symbol[s]["acct_ccy_value"] < 0)).fillna(0) for s in tradable_symbols ] ) strategy_stats["NOP_value"] = np.maximum(pos_value, neg_value) global_strategy_stats = get_global_strategy_stats(trading_stats_by_symbol, strategy_stats, tradable_symbols, time_line) session_dict["trading_stats_by_symbol"] = trading_stats_by_symbol session_dict["strategy_stats"] = strategy_stats session_dict["global_strategy_stats"] = global_strategy_stats if is_trades_stats_needed: if apply_dividends: cum_dividends_by_symbol = {} cum_acct_ccy_dividends_by_symbol = {} for key, value in dividends_by_symbol.items(): if key not in tradable_symbols: continue else: cum_dividends_by_symbol[key] = value[start_idx:end_idx].cumsum() cum_acct_ccy_dividends_by_symbol[key] = ( (value / fx_rates[fx_rate_symbol_by_symbol[key]]["close"]).fillna(0)[start_idx:end_idx].cumsum() ) ( trades_by_symbol, trade_stats_by_symbol, long_trade_stats_by_symbol, short_trade_stats_by_symbol, all_trades, global_trade_stats, long_global_trade_stats, short_global_trade_stats, ) = get_trade_stats( trading_stats_by_symbol, tradable_symbols, {key: value[start_idx:end_idx] for key, value in list_trade_fill_price_by_symbol.items()}, {key: value[start_idx:end_idx] for key, value in fx_rates_info.items()}, session_dict["execution_cost_bps"] if apply_execution_cost else None, apply_dividends, cum_dividends_by_symbol if apply_dividends else None, cum_acct_ccy_dividends_by_symbol if apply_dividends else None, ) yield 100 session_dict["trades_by_symbol"] = trades_by_symbol session_dict["trade_stats_by_symbol"] = trade_stats_by_symbol session_dict["long_trade_stats_by_symbol"] = long_trade_stats_by_symbol session_dict["short_trade_stats_by_symbol"] = short_trade_stats_by_symbol session_dict["all_trades"] = all_trades session_dict["global_stats_by_symbol"] = global_trade_stats session_dict["long_global_trade_stats"] = long_global_trade_stats session_dict["short_global_trade_stats"] = short_global_trade_stats else: yield 100 def get_combined_trading_statistics( session_dict, list_trading_stats_by_symbol, list_strategy_stats, start_idx=0, end_idx=None, apply_dividends=False, ): """ Returns a dictionary of combined statistics Parameters: session_dict: session, which contains all the fields we need, like execution_cost_bps, actual_currency, fx_rates, etc. list_trading_stats_by_symbol: python list of trading_stats_by_symbol from session_dicts which statistics we want to combine( [s["trading_stats_by_symbol"] for s in many_session_dicts]). list_strategy_stats: python list of strategy_stats from session_dicts which statistics we want to combine. apply_dividends: same as apply_dividends of these sessions """ time_line = session_dict["time_line"] tradable_symbols = session_dict["tradable_symbols"] actual_currency = session_dict["actual_currency"] if session_dict["actual_currency"] else "USD" fx_rates = get_actual_currency_fx_rates(session_dict, actual_currency) symbol_to_currency = session_dict["symbol_to_currency"] fx_rate_template = DATA_PROVIDER_FX_RATES[session_dict["data_provider"]] fx_rate_symbol_by_symbol = { s: fx_rate_template % (actual_currency, symbol_to_currency[s]) if "/" not in symbol_to_currency[s] else fx_rate_template % (symbol_to_currency[s].split("/")[0], symbol_to_currency[s].split("/")[1]) for s in tradable_symbols } fx_rates_info = dict() for key in tradable_symbols: fx_rates_info[key] = fx_rates[fx_rate_symbol_by_symbol[key]]["close"] if apply_dividends: dividends_by_symbol = get_sparse_dividends_for_each_tradable_symbol(session_dict) apply_execution_cost = session_dict.get("execution_cost_bps", 0.0) != 0.0 if end_idx is None: end_idx = len(time_line) combined_trading_stats_by_symbol = { symbol: list_trading_stats_by_symbol[0][symbol].copy() for symbol in list_trading_stats_by_symbol[0] } for symbol in combined_trading_stats_by_symbol: for col in combined_trading_stats_by_symbol[symbol].columns: combined_trading_stats_by_symbol[symbol][col].values[:] = 0 for t_stats_by_symbol in list_trading_stats_by_symbol: for symbol in combined_trading_stats_by_symbol: combined_trading_stats_by_symbol[symbol] += t_stats_by_symbol[symbol] trading_stats_by_symbol = combined_trading_stats_by_symbol combined_strategy_stats = list_strategy_stats[0].copy() for col in combined_strategy_stats.columns: combined_strategy_stats[col].values[:] = 0 for s_stats in list_strategy_stats: combined_strategy_stats += s_stats strategy_stats = combined_strategy_stats pos_value = sum( [ (trading_stats_by_symbol[s]["acct_ccy_value"] * (trading_stats_by_symbol[s]["acct_ccy_value"] > 0)).fillna(0) for s in tradable_symbols ] ) neg_value = sum( [ (trading_stats_by_symbol[s]["acct_ccy_value"].abs() * (trading_stats_by_symbol[s]["acct_ccy_value"] < 0)).fillna(0) for s in tradable_symbols ] ) strategy_stats["NOP_value"] = np.maximum(pos_value, neg_value) long_alert = {s: trading_stats_by_symbol[s]["buy_alert"] for s in trading_stats_by_symbol} short_alert = {s: trading_stats_by_symbol[s]["sell_alert"] for s in trading_stats_by_symbol} global_strategy_stats = get_global_strategy_stats(trading_stats_by_symbol, strategy_stats, tradable_symbols, time_line) list_trade_fill_price_by_symbol = trade_fill_prices.get_trade_price_by_backend_key( session_dict["fill_trade_price"] ).get_list_trade_fill_price_by_symbol({key: session_dict["data_by_symbol"][key] for key in tradable_symbols}) if apply_dividends: cum_dividends_by_symbol = {} cum_acct_ccy_dividends_by_symbol = {} for key, value in dividends_by_symbol.items(): if key not in tradable_symbols: continue else: cum_dividends_by_symbol[key] = value[start_idx:end_idx].cumsum() cum_acct_ccy_dividends_by_symbol[key] = ( (value / fx_rates[fx_rate_symbol_by_symbol[key]]["close"]).fillna(0)[start_idx:end_idx].cumsum() ) ( trades_by_symbol, trade_stats_by_symbol, long_trade_stats_by_symbol, short_trade_stats_by_symbol, all_trades, global_trade_stats, long_global_trade_stats, short_global_trade_stats, ) = get_trade_stats( trading_stats_by_symbol, tradable_symbols, {key: value[start_idx:end_idx] for key, value in list_trade_fill_price_by_symbol.items()}, {key: value[start_idx:end_idx] for key, value in fx_rates_info.items()}, session_dict["execution_cost_bps"] if apply_execution_cost else None, apply_dividends, cum_dividends_by_symbol if apply_dividends else None, cum_acct_ccy_dividends_by_symbol if apply_dividends else None, ) return { "long_alert": long_alert, "short_alert": short_alert, "trading_stats_by_symbol": trading_stats_by_symbol, "strategy_stats": strategy_stats, "global_strategy_stats": global_strategy_stats, "trades_by_symbol": trades_by_symbol, "trade_stats_by_symbol": trade_stats_by_symbol, "long_trade_stats_by_symbol": long_trade_stats_by_symbol, "short_trade_stats_by_symbol": short_trade_stats_by_symbol, "all_trades": all_trades, "global_stats_by_symbol": global_trade_stats, "long_global_trade_stats": long_global_trade_stats, "short_global_trade_stats": short_global_trade_stats, }