jobs/jobs/databricks_utils.py (83 lines of code) (raw):

import dataclasses import json import logging import os from datetime import datetime from typing import Iterator, List import jobs.pipeline as pipeline from databricks.sdk import WorkspaceClient from jobs.schemas import Pipeline logger = logging.getLogger(__name__) logger.setLevel("DEBUG") DATABRICKS_SERVICE_SCHEME = os.getenv("DATABRICKS_SERVICE_SCHEME", "http") DATABRICKS_SERVICE_HOST = os.getenv("DATABRICKS_SERVICE_HOST") DATABRICKS_SERVICE_PORT = os.getenv("DATABRICKS_SERVICE_PORT") def get_client() -> WorkspaceClient: url = f"{DATABRICKS_SERVICE_SCHEME}://{DATABRICKS_SERVICE_HOST}" if DATABRICKS_SERVICE_PORT: url = f"{url}:{DATABRICKS_SERVICE_PORT}" return WorkspaceClient(host=url, token=os.environ["DATABRICKS_KEY"]) def get_pipelines() -> Iterator[Pipeline]: client = get_client() pipelines = client.jobs.list() for pipeline_ in pipelines: yield Pipeline( id=pipeline_.job_id, type="databricks", version=1, date=datetime.today(), # todo: change to created_time meta={}, name=f"{pipeline_.settings.name}", ) async def run( pipeline_id: str, job_id: int, files: List[pipeline.PipelineFile], current_tenant: str, datasets: List[pipeline.Dataset], revisions: List[str], ) -> None: logger.info( "Running pipeline %s, job_id %s, current_tenant: %s with arguments %s", pipeline_id, job_id, current_tenant, files, ) client = get_client() client.jobs.run_now( pipeline_id, job_parameters={ "badgerdoc_job_parameters": json.dumps( dataclasses.asdict( pipeline.PipelineRunArgs( job_id=job_id, tenant=current_tenant, files_data=files, datasets=datasets, revisions=revisions, ) ) ) }, ) class DatabricksPipeline(pipeline.BasePipeline): async def list(self) -> List[pipeline.AnyPipeline]: # todo: bind to AnyPipeline return get_pipelines() async def run( self, pipeline_id: str, job_id: str, files: List[pipeline.PipelineFile], current_tenant: str, datasets: List[pipeline.Dataset], revisions: List[str], ) -> None: await run( pipeline_id, int(job_id), files, current_tenant, datasets=datasets, revisions=revisions, )