in src/backend/entrypoints/lambdas/lambda_run.py [0:0]
def handle_alert(lambda_event: Dict[str, Any], lambda_session: Dict[str, Any], flow: List[Callable]) -> Tuple[str, str]:
from market_alerts.config import NOTIFICATION_METHODS
from market_alerts.domain.constants import (
DATA_PERIODICITIES_NAMES_TO_BACKEND_KEY,
DATA_TIME_RANGES_NAMES_TO_BACKEND_KEY,
DATA_TIME_RANGES_NAMES_TO_VALUES,
)
from market_alerts.domain.services import notify
from market_alerts.domain.services.notifier import EmailMethod
from market_alerts.utils import ms_to_string
lambda_session = {
"data_provider": lambda_event["data_source"],
"tradable_symbols_prompt": lambda_event["tradable_symbols_prompt"],
"supplementary_symbols_prompt": lambda_event["supplementary_symbols_prompt"],
"fx_rates": lambda_event["fx_rates"],
"indicators_query": lambda_event["indicators_prompt"],
"indicators_logic": lambda_event["indicators_logic"],
"alerts_query": lambda_event["alerts_prompt"],
"alerts_logic": lambda_event["alerts_logic"],
"datasets_keys": lambda_event["datasets"] if lambda_event.get("datasets") is not None else ["Prices"],
"interval": DATA_PERIODICITIES_NAMES_TO_BACKEND_KEY[ms_to_string(lambda_event["periodicity"])],
"time_period": DATA_TIME_RANGES_NAMES_TO_BACKEND_KEY[ms_to_string(lambda_event["time_range"])],
"indicators_dialogue": [lambda_event["indicators_prompt"]],
"alerts_dialogue": [lambda_event["alerts_prompt"]],
**lambda_session,
}
end_date = datetime.now()
start_date = (end_date - DATA_TIME_RANGES_NAMES_TO_VALUES[ms_to_string(lambda_event["time_range"])]).strftime(
"%Y-%m-%d %H:%M:%S"
)
end_date = end_date.strftime("%Y-%m-%d %H:%M:%S")
lambda_session["start_date"] = start_date
lambda_session["end_date"] = end_date
for func in flow:
func(lambda_session)
is_alert_series = lambda_session["trigger_alert"]
is_alert = int(is_alert_series.iloc[-1])
if is_alert:
notification_methods = [
method
for method in [
EmailMethod(
recipients=lambda_event["alert_emails"],
subject=lambda_event["title"],
body=build_notification_body(lambda_event, lambda_session),
),
]
if method.METHOD_NAME in [m.strip() for m in NOTIFICATION_METHODS.split(";")]
]
notify(notification_methods)
return "ALERTED", build_notification_body(lambda_event, lambda_session)
else:
return "FINISHED", "Alert was not triggered"