databricks/lib/spark_helper/ground_truth.py (53 lines of code) (raw):

import json from datetime import datetime from typing import Any, Dict from lib.spark_helper.db_service import SparkDBService from lib.spark_helper.storage_service import SparkStorageService class GroundTruthDBStorage: TABLE_NAME = "ground_truth" COLUMNS = { "file_id": "INT", "revision_id": "STRING", "is_latest": "BOOLEAN", "create_date": "TIMESTAMP", } def __init__(self, configs: Dict[str, Any]): self.db_service = SparkDBService(configs) self.create_db_resources() def create_db_resources(self) -> None: self.db_service.create_table_if_not_exists( self.TABLE_NAME, self.COLUMNS ) def set_is_latest_false(self, file_id: int) -> None: self.db_service.update_table( table_name=self.TABLE_NAME, set="is_latest = FALSE", filters=f"file_id = {file_id} and is_latest = TRUE", ) def insert_revision(self, file_id: int, revision_id: str) -> None: self.db_service.insert_table( table_name=self.TABLE_NAME, values=[file_id, revision_id, "TRUE", datetime.now()], ) class GroundTruthFileStorage: VOLUME_NAME = "ground_truth" STORAGE_PATH = VOLUME_NAME + "/{file_id}/{revision_id}.json" def __init__(self, configs: Dict[str, Any]): self.storage_service = SparkStorageService(configs) self.create_storage_resources() def create_storage_resources(self) -> None: self.storage_service.create_volume_if_not_exists(self.VOLUME_NAME) def write_annotations_to_json( self, file_id: int, revision_id: str, annotations: Dict[str, str] ) -> None: self.storage_service.write_text( data=json.dumps(annotations, indent=4), file_path=self.STORAGE_PATH.format( file_id=file_id, revision_id=revision_id ), ) def read_revision_file(self, file_id: int, revision_id: str) -> Any: fpath = self.STORAGE_PATH.format( file_id=file_id, revision_id=revision_id ) return json.loads(self.storage_service.read_text(fpath))