databricks/lib/spark_helper/storage_service.py (37 lines of code) (raw):
import os
from pathlib import Path
from typing import Any, Dict, List
from databricks.sdk.runtime import spark
class SparkStorageService:
def __init__(self, configs: Dict[str, Any]):
self._configs = configs
self._catalog = self._configs["databricks"]["catalog"]
self._schema = self._configs["databricks"]["schema"]
def create_volume_if_not_exists(self, volume_name: str) -> None:
create_volume = f"CREATE VOLUME IF NOT EXISTS {self._catalog}.{self._schema}.{volume_name}"
spark.sql(create_volume)
def list_files(self, file_path: Path | str) -> List[Path]:
file_path = (
Path(f"/Volumes/{self._catalog}/{self._schema}") / file_path
)
return list(file_path.iterdir())
def read_text(self, file_path: Path | str) -> str:
file_path = (
Path(f"/Volumes/{self._catalog}/{self._schema}") / file_path
)
with open(file_path, "r") as file:
return file.read()
def write_text(self, data: str, file_path: Path | str) -> None:
file_path = (
Path(f"/Volumes/{self._catalog}/{self._schema}") / file_path
)
os.makedirs(file_path.parent, exist_ok=True)
with open(file_path, "w") as file:
file.write(data)
def write_binary(self, data: bytes, file_path: Path | str) -> None:
file_path = (
Path(f"/Volumes/{self._catalog}/{self._schema}") / file_path
)
os.makedirs(file_path.parent, exist_ok=True)
with open(file_path, "wb") as file:
file.write(data)