databricks/lib/spark_helper/db_service.py (25 lines of code) (raw):

from typing import Any, Dict, List from databricks.sdk.runtime import spark class SparkDBService: def __init__(self, configs: Dict[str, Any]): self._configs = configs self._catalog = self._configs["databricks"]["catalog"] self._schema = self._configs["databricks"]["schema"] def create_table_if_not_exists( self, table_name: str, columns: Dict[str, str] ) -> None: columns_str = ", ".join( [f"{col} {col_type}" for col, col_type in columns.items()] ) create_table = ( f"CREATE TABLE IF NOT EXISTS {self._catalog}.{self._schema}.{table_name} " f"({columns_str})" ) spark.sql(create_table) def update_table(self, table_name: str, set: str, filters: str) -> None: query = f""" UPDATE {self._catalog}.{self._schema}.{table_name} SET {set} WHERE {filters} """ spark.sql(query) def insert_table(self, table_name: str, values: List[Any]) -> None: placeholders = ", ".join(["?"] * len(values)) query = f""" INSERT INTO {self._catalog}.{self._schema}.{table_name} VALUES ({placeholders}) """ spark.sql(query, values)