src/backend/domain/data_providers/polygon.py (216 lines of code) (raw):
import json
from datetime import datetime
from string import Template
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import requests
from market_alerts.domain.exceptions import (
DataNotFoundError,
MethodIsNotImplementedError,
)
from .default import DefaultDataProvider
class PolygonDataProvider(DefaultDataProvider):
PROVIDER_ID = "PI"
PROVIDER_NAME = "PolygonIO"
POLYGON_PRICES_URL_TEMPLATE = Template(
f"https://api.polygon.io/v2/aggs/ticker/$symbol/range/$multiplier/$timespan/$start_date/$end_date?sort=asc&apiKey=$api_key"
)
POLYGON_SYMBOL_SEARCH_TEMPLATE = Template(
f"https://api.polygon.io/v3/reference/tickers?search=$symbol&limit=$outputsize&apiKey=$polygon_api_key"
)
POLYGON_DIVIDENDS_URL_TEMPLATE = Template(
f"https://api.polygon.io/v3/reference/dividends?ticker=$symbol&apiKey=$polygon_api_key"
)
POLYGON_SYMBOL_META_TEMPLATE = Template(f"https://api.polygon.io/v3/reference/tickers?ticker=$symbol&apiKey=$polygon_api_key")
def __init__(self, polygon_api_key: str):
super().__init__()
self.api_key = polygon_api_key
def get_split_adjusted_prices(
self, symbol: str, start_date: str, end_date: str, interval: str
) -> Tuple[Dict[str, Any], Optional[str]]:
error_message = None
multiplier, timespan = self._convert_periodicity_to_polygon_multiplier_timespan(interval)
start_date = start_date.split()[0]
end_date = end_date.split()[0]
url = self.POLYGON_PRICES_URL_TEMPLATE.substitute(
api_key=self.api_key,
symbol=symbol,
start_date=start_date,
end_date=end_date,
multiplier=multiplier,
timespan=timespan,
)
data_prices = json.loads(requests.get(url).content.decode())
if data_prices.get("code") == 400:
raise DataNotFoundError(data_prices["message"])
if data_prices:
next_url = data_prices.get("next_url", None)
converted_prices = self._convert_prices_to_twelvedata_format(data_prices, None)
while next_url is not None:
data_prices = json.loads(requests.get(next_url + f"&apiKey={self.api_key}").content.decode())
if data_prices.get("code") == 400:
raise DataNotFoundError(data_prices["message"])
if data_prices:
next_url = data_prices.get("next_url", None)
if next_url is not None:
next_url = next_url + f"&apiKey={self.api_key}"
else:
next_url = None
converted_prices = self._convert_prices_to_twelvedata_format(data_prices, converted_prices)
return converted_prices, error_message
def get_dividends(self, symbol: str, start_date: str, end_date: str) -> Tuple[Dict[str, Any], Optional[str]]:
error_message = None
start_date = start_date.split()[0]
end_date = end_date.split()[0]
url = self.POLYGON_DIVIDENDS_URL_TEMPLATE.substitute(polygon_api_key=self.api_key, symbol=symbol)
json_dividends = requests.get(url).content.decode()
data_dividends = json.loads(json_dividends)
if data_dividends.get("code") == 400:
raise DataNotFoundError(data_dividends["message"])
while not self._check_dividends_are_full(start_date, data_dividends):
next_url = data_dividends.get("next_url", None)
if next_url is None:
break
next_divs = json.loads(requests.get(next_url + f"&apiKey={self.api_key}").content.decode())
data_dividends["results"].extend(next_divs["results"])
data_dividends["next_url"] = next_divs.get("next_url", None)
trimmed_payments = self._trim_dividends_if_too_much_data(start_date, end_date, data_dividends)
data_dividends["results"] = trimmed_payments
return self._convert_dividends_to_twelvedata_format(data_dividends), error_message
@staticmethod
def _check_dividends_are_full(start_date: str, data_dividends: Dict[str, Any]) -> bool:
if not data_dividends:
return True
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
for payment in data_dividends["results"]:
payment_datetime_ex = datetime.strptime(payment["ex_dividend_date"], "%Y-%m-%d")
if payment_datetime_ex <= start_datetime:
return True
return False
@staticmethod
def _trim_dividends_if_too_much_data(start_date: str, end_date: str, data_dividends: Dict[str, Any]) -> Dict[str, Any]:
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
end_datetime = datetime.strptime(end_date, "%Y-%m-%d")
result_trim = []
for payment in data_dividends["results"]:
payment_datetime_ex = datetime.strptime(payment["ex_dividend_date"], "%Y-%m-%d")
if payment_datetime_ex >= start_datetime and payment_datetime_ex <= end_datetime:
result_trim.append(payment)
return result_trim
@staticmethod
def _convert_dividends_to_twelvedata_format(data_dividends: Dict[str, Any]):
converted_result = {
"meta": {
"symbol": None,
"name": "",
"currency": None,
"exchange": "",
"mic_code": "",
"exchange_timezone": "",
},
"dividends": [],
}
for payment in data_dividends["results"]:
if converted_result["meta"]["symbol"] is None:
converted_result["meta"]["symbol"] = payment["ticker"]
if converted_result["meta"]["currency"] is None:
converted_result["meta"]["currency"] = payment["currency"]
converted_payment = {
"ex_date": payment["ex_dividend_date"],
"amount": payment["cash_amount"],
}
converted_result["dividends"].append(converted_payment)
return converted_result
@staticmethod
def _convert_periodicity_to_polygon_multiplier_timespan(periodicity: str) -> Tuple[int, str]:
multiplier = ""
timespan = ""
for char in periodicity:
if char.isdigit():
multiplier += char
else:
timespan += char
if "h" in timespan:
timespan = "hour"
if "min" in timespan:
timespan = "minute"
return int(multiplier), timespan
def _convert_prices_to_twelvedata_format(
self, polygon_timeseries: Dict[str, Any], format_container: Dict[str, Any]
) -> Dict[str, Any]:
if format_container is not None:
result = format_container
else:
meta = self._get_ticker_meta(polygon_timeseries["ticker"])
result = {
"meta": meta,
"values": [],
}
for val in polygon_timeseries.get("results", []):
timestamp = datetime.fromtimestamp(val["t"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
result["values"].append(
{
"datetime": timestamp.split(" ")[0],
"open": val["o"],
"high": val["h"],
"low": val["l"],
"close": val["c"],
"volume": val["v"],
}
)
return result
def search_ticker(self, tickers_query: str, output_limit: int = 1000) -> List[Dict[str, str]]:
url = self.POLYGON_SYMBOL_SEARCH_TEMPLATE.substitute(
symbol=tickers_query, outputsize=output_limit, polygon_api_key=self.api_key
)
data = json.loads(requests.get(url).content.decode())
symbols = self._convert_ticker_info_to_twelvedate_format(data["results"])
return symbols
@staticmethod
def _convert_ticker_info_to_twelvedate_format(ticker_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
converted_data = []
for ticker in ticker_data:
converted_ticker = {
"symbol": ticker["ticker"],
"instrument_name": ticker.get("name", ""),
"exchange": ticker.get("primary_exchange", ""),
"mic_code": "",
"exchange_timezone": "",
"instrument_type": ticker.get("type", ""),
"country": ticker.get("locale", ""),
"currency": ticker.get("currency_name", ""),
}
converted_data.append(converted_ticker)
return converted_data
def _get_ticker_meta(self, symbol: str) -> Dict[str, Any]:
url = self.POLYGON_SYMBOL_META_TEMPLATE.substitute(polygon_api_key=self.api_key, symbol=symbol)
data = json.loads(requests.get(url).content.decode())
if len(data["results"]) > 0:
meta = {
"symbol": symbol,
"interval": "",
"currency": "USD",
"exchange_timezone": data["results"][0].get("locale", ""),
"exchange": data["results"][0].get("primary_exchange", ""),
"mic_code": "",
"type": data["results"][0].get("name", ""),
}
else:
meta = {
"symbol": symbol,
"interval": "",
"currency": "USD",
"exchange_timezone": "",
"exchange": "",
"mic_code": "",
"type": "",
}
return meta
def get_earnings(self, symbol: str, start_date: str, end_date: str) -> Dict[str, Any]:
return {}
def get_balance_sheets(self, symbols: List[str], true_symbols: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
return {}, {}
def get_income_statements(self, symbols: List[str], true_symbols: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
return {}, {}
def get_economic_indicators(self, symbol: str, start_date: str, end_date: str) -> Dict[str, Any]:
NotImplementedError("Method get_economic_indicators is not implemented for polygon io data provider.")