aidial_interceptors_sdk/utils/storage.py (46 lines of code) (raw):
import io
import logging
from typing import Mapping
from urllib.parse import urljoin
import httpx
from aidial_sdk.pydantic_v1 import BaseModel
_log = logging.getLogger(__name__)
class FileStorage(BaseModel):
class Config:
arbitrary_types_allowed = True
dial_url: str
api_key: str
http_client: httpx.AsyncClient
@property
def headers(self) -> Mapping[str, str]:
return {"api-key": self.api_key}
async def upload(
self, url: str, content_type: str | None, content: bytes
) -> None:
if self.to_dial_url(url) is None:
raise ValueError(f"URL isn't DIAL url: {url!r}")
url = self._to_abs_url(url)
response = await self.http_client.put(
url=url,
files={"file": (url, io.BytesIO(content), content_type)},
headers=self.headers,
)
response.raise_for_status()
meta = response.json()
_log.debug(f"uploaded file: url={url!r}, metadata={meta}")
def to_dial_url(self, link: str) -> str | None:
url = self._to_abs_url(link)
base_url = f"{self.dial_url}/v1/"
if url.startswith(base_url):
return url.removeprefix(base_url)
return None
def _to_abs_url(self, link: str) -> str:
base_url = f"{self.dial_url}/v1/"
return urljoin(base_url, link)
async def download(self, url: str) -> bytes:
if self.to_dial_url(url) is None:
raise ValueError(f"URL isn't DIAL url: {url!r}")
url = self._to_abs_url(url)
response = await self.http_client.get(url, headers=self.headers)
response.raise_for_status()
return response.content