osci/datalake/blob/base.py (76 lines of code) (raw):

"""Copyright since 2020, EPAM Systems This file is part of OSCI. OSCI is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OSCI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OSCI. If not, see <http://www.gnu.org/licenses/>.""" import yaml from osci.utils import get_pandas_data_frame_info, get_azure_blob_connection_string from osci.datalake.base import BaseDataLakeArea from azure.storage.blob import BlobServiceClient, ContainerClient, ContentSettings from azure.core.exceptions import ResourceNotFoundError from io import BytesIO, StringIO import logging import pandas as pd import pyarrow.parquet as pq log = logging.getLogger(__name__) class BlobArea(BaseDataLakeArea): AREA_CONTAINER = None @property def _github_events_commits_base(self): return 'github/events/push' @property def _github_repositories_base(self): return 'github/repository' _github_raw_events_commits_base = 'github/raw-events/push' def __init__(self, storage_account_name: str, storage_account_key: str, *args, area_container: str = AREA_CONTAINER, **kwargs): super().__init__(*args, **kwargs) self.AREA_CONTAINER = area_container self.storage_account_name = storage_account_name self.storage_account_key = storage_account_key connection_string = get_azure_blob_connection_string(account_name=self.storage_account_name, account_key=self.storage_account_key) self.blob_service: BlobServiceClient = BlobServiceClient.from_connection_string(conn_str=connection_string) self.container_client: ContainerClient = self.blob_service.get_container_client(container=self.AREA_CONTAINER) log.debug(f'BLOB Data Lake area {self.AREA_CONTAINER} created') def add_fs_prefix(self, path: str) -> str: return f'wasbs://{self.AREA_CONTAINER}@{self.storage_account_name}.blob.core.windows.net/{path}' def add_http_prefix(self, path: str) -> str: return f'https://{self.storage_account_name}.blob.core.windows.net/{self.AREA_CONTAINER}/{path}' def write_pandas_dataframe_to_parquet(self, df: pd.DataFrame, path: str, index=False): """Writes pandas dataframe to parquet to blob :param df: dataframe to write :param path: blob name to save :param index: write index column """ with BytesIO() as buffer: log.debug(f"Save pandas df to {self.AREA_CONTAINER}/{path}; df.info: {get_pandas_data_frame_info(df)}") df.to_parquet(buffer, engine='pyarrow', index=index) container_client = self.blob_service.get_container_client(container=self.AREA_CONTAINER) container_client.upload_blob(name=path, data=buffer.getvalue(), overwrite=True) def read_pandas_dataframe_from_parquet(self, path: str) -> pd.DataFrame: """Load parquet data from azure blob to pandas dataframe :param path: blob_name to load :return: pandas dataframe from blob """ blob_client = self.blob_service.get_blob_client(container=self.AREA_CONTAINER, blob=path) try: with BytesIO(blob_client.download_blob().readall()) as buffer: return pq.read_table(source=buffer).to_pandas() except ResourceNotFoundError as ex: log.error(f"ResourceNotFound {ex}") def write_pandas_dataframe_to_csv(self, df: pd.DataFrame, path: str, index=False): """Writes pandas dataframe to csv to blob :param df: dataframe to write :param path: blob name to save :param index: write index column """ with StringIO() as buffer: log.debug(f"Save pandas df to {self.AREA_CONTAINER}/{path}; df.info: {get_pandas_data_frame_info(df)}") df.to_csv(buffer, index=index) container_client = self.blob_service.get_container_client(container=self.AREA_CONTAINER) container_client.upload_blob(name=path, data=buffer.getvalue(), overwrite=True) def read_pandas_dataframe_from_csv(self, path: str, dtype=None) -> pd.DataFrame: """Load parquet data from azure blob to pandas dataframe :param path: blob_name to load :param dtype: Type name or dict of column -> type, optional :return: pandas dataframe from blob """ blob_client = self.blob_service.get_blob_client(container=self.AREA_CONTAINER, blob=path) try: with StringIO(blob_client.download_blob().readall().decode()) as buffer: return pd.read_csv(buffer, dtype=dtype) except ResourceNotFoundError as ex: log.error(f"ResourceNotFound {ex}") def write_bytes_to_file(self, path: str, buffer: BytesIO): container_client = self.blob_service.get_container_client(container=self.AREA_CONTAINER) container_client.upload_blob(name=path, data=buffer.getvalue(), overwrite=True) def write_string_to_file(self, path: str, data: str, content_type: str = 'application/octet-stream'): with StringIO(data) as buffer: container_client = self.blob_service.get_container_client(container=self.AREA_CONTAINER) container_client.upload_blob(name=path, data=buffer.getvalue(), overwrite=True, content_settings=ContentSettings(content_type=content_type)) def read_yaml_file(self, path: str): log.info(f'Read yml from path: {path}') blob_client = self.blob_service.get_blob_client(container=self.AREA_CONTAINER, blob=path) try: with StringIO(blob_client.download_blob().readall().decode()) as buffer: return yaml.load(buffer, Loader=yaml.FullLoader) except ResourceNotFoundError as ex: log.error(f"ResourceNotFound {ex}")