pipe-cli/src/utilities/storage/azure.py (382 lines of code) (raw):

# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import import copy import io import os from threading import Lock import time from datetime import timedelta, datetime from src.model.datastorage_usage_model import StorageUsage from src.utilities.audit import DataAccessEvent, DataAccessType from src.utilities.encoding_utilities import to_string from src.utilities.storage.storage_usage import StorageUsageAccumulator try: from urllib.request import urlopen # Python 3 except ImportError: from urllib2 import urlopen # Python 2 from azure.storage.blob import BlockBlobService, ContainerPermissions, Blob from azure.storage.common._auth import _StorageSASAuthentication from src.api.data_storage import DataStorage from src.model.data_storage_item_model import DataStorageItemModel, DataStorageItemLabelModel from src.model.data_storage_tmp_credentials_model import TemporaryCredentialsModel from src.utilities.patterns import PatternMatcher from src.utilities.storage.common import StorageOperations, AbstractTransferManager, AbstractListingManager, \ AbstractDeleteManager, UploadResult, TransferResult from src.utilities.progress_bar import ProgressPercentage from src.config import Config class AzureProgressPercentage(ProgressPercentage): def __init__(self, filename, size): super(AzureProgressPercentage, self).__init__(filename, size) self._total_bytes = 0 def __call__(self, bytes_amount): newest_bytes = bytes_amount - self._total_bytes self._total_bytes = bytes_amount super(AzureProgressPercentage, self).__call__(newest_bytes) @staticmethod def callback(source_key, size, quiet, lock=None): if not StorageOperations.show_progress(quiet, size, lock): return None progress = AzureProgressPercentage(source_key, size) return lambda current, _: progress(current) class AzureManager: def __init__(self, blob_service, events=None): self.service = blob_service self.events = events def get_max_connections(self, io_threads): return max(io_threads, 1) if io_threads is not None else 2 class AzureListingManager(AzureManager, AbstractListingManager): DEFAULT_PAGE_SIZE = StorageOperations.DEFAULT_PAGE_SIZE def __init__(self, blob_service, bucket): super(AzureListingManager, self).__init__(blob_service) self.bucket = bucket self.delimiter = StorageOperations.PATH_SEPARATOR def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, show_all=False, show_archive=False): prefix = StorageOperations.get_prefix(relative_path) blobs_generator = self.service.list_blobs(self.bucket.path, prefix=prefix if relative_path else None, num_results=page_size if not show_all else None, delimiter=StorageOperations.PATH_SEPARATOR if not recursive else None) absolute_items = [self._to_storage_item(blob) for blob in blobs_generator] return absolute_items if recursive else [self._to_local_item(item, prefix) for item in absolute_items] def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, start_token=None, show_archive=False): return self.list_items(relative_path, recursive, page_size, show_all=False, show_archive=show_archive), None def get_summary(self, relative_path=None): prefix = StorageOperations.get_prefix(relative_path) blobs_generator = self.service.list_blobs(self.bucket.path, prefix=prefix if relative_path else None) storage_usage = StorageUsage() for blob in blobs_generator: if type(blob) == Blob: storage_usage.add_item(AbstractListingManager.STANDARD_TIER, blob.properties.content_length) return [self.delimiter.join([self.bucket.path, relative_path]), storage_usage] def get_summary_with_depth(self, max_depth, relative_path=None): prefix = StorageOperations.get_prefix(relative_path) blobs_generator = self.service.list_blobs(self.bucket.path, prefix=prefix if relative_path else None) accumulator = StorageUsageAccumulator(self.bucket.path, relative_path, self.delimiter, max_depth) for blob in blobs_generator: if type(blob) == Blob: size = blob.properties.content_length name = blob.name accumulator.add_path(name, AbstractListingManager.STANDARD_TIER, size) return accumulator.get_tree() def get_listing_with_depth(self, max_depth, relative_path=None): raise NotImplementedError("List items with depth is not implemented yet") def _to_storage_item(self, blob): item = DataStorageItemModel() item.name = blob.name item.path = item.name if type(blob) == Blob: item.type = 'File' item.changed = self._to_local_timezone(blob.properties.last_modified) item.size = blob.properties.content_length item.labels = [DataStorageItemLabelModel('StorageClass', blob.properties.blob_tier.upper())] else: item.type = 'Folder' return item def _to_local_timezone(self, utc_datetime): return utc_datetime.astimezone(Config.instance().timezone()) def _to_local_item(self, absolute_item, prefix): relative_item = copy.deepcopy(absolute_item) relative_item.name = StorageOperations.get_item_name(relative_item.name, prefix) relative_item.path = relative_item.name return relative_item def get_file_tags(self, relative_path): return dict(self.service.get_blob_metadata(self.bucket.path, relative_path)) class AzureDeleteManager(AzureManager, AbstractDeleteManager): def __init__(self, blob_service, events, bucket): super(AzureDeleteManager, self).__init__(blob_service, events) self.bucket = bucket self.delimiter = StorageOperations.PATH_SEPARATOR self.listing_manager = AzureListingManager(self.service, self.bucket) def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, page_size=None): if version or hard_delete: raise RuntimeError('Versioning is not supported by AZURE cloud provider') prefix = StorageOperations.get_prefix(relative_path) check_file = True if prefix.endswith(self.delimiter): prefix = prefix[:-1] check_file = False if not recursive: deleted = self.__delete_blob(prefix, exclude, include) if deleted: self._delete_all_object_tags([prefix]) else: blob_names_for_deletion = [] for item in self.listing_manager.list_items(prefix, recursive=True, show_all=True): if item.name == prefix and check_file: blob_names_for_deletion = [item.name] break if self.__file_under_folder(item.name, prefix): blob_names_for_deletion.append(item.name) deleted_blob_names = [] for blob_name in blob_names_for_deletion: deleted = self.__delete_blob(blob_name, exclude, include, prefix=prefix) if deleted: deleted_blob_names.append(blob_name) self._delete_all_object_tags(deleted_blob_names) def _delete_all_object_tags(self, blob_names_for_deletion, chunk_size=100): for blob_names_for_deletion_chunk in [blob_names_for_deletion[i:i + chunk_size] for i in range(0, len(blob_names_for_deletion), chunk_size)]: DataStorage.batch_delete_all_object_tags(self.bucket.identifier, [{'path': blob_name} for blob_name in blob_names_for_deletion_chunk]) def __file_under_folder(self, file_path, folder_path): return StorageOperations.without_prefix(file_path, folder_path).startswith(self.delimiter) def __delete_blob(self, blob_name, exclude, include, prefix=None): file_name = blob_name if prefix: relative_file_name = StorageOperations.get_item_name(blob_name, prefix=prefix + self.delimiter) file_name = StorageOperations.get_prefix(relative_file_name) if not PatternMatcher.match_any(file_name, include): return False if PatternMatcher.match_any(file_name, exclude, default=False): return False self.events.put(DataAccessEvent(blob_name, DataAccessType.DELETE, storage=self.bucket)) self.service.delete_blob(self.bucket.path, blob_name) return True class TransferBetweenAzureBucketsManager(AzureManager, AbstractTransferManager): _COPY_SUCCESS_STATUS = 'success' _COPY_PENDING_STATUS = 'pending' _COPY_ABORTED_STATUS = 'aborted' _COPY_FAILED_STATUS = 'failed' _COPY_TERMINAL_STATUSES = [_COPY_SUCCESS_STATUS, _COPY_ABORTED_STATUS, _COPY_FAILED_STATUS] _SYNC_COPY_SIZE_LIMIT = (256 - 1) * 1024 * 1024 # 255 Mb _POLLS_TIMEOUT = 10 # 10 seconds _POLLS_LIMIT = 60 * 60 * 3 # 3 hours _POLLS_ATTEMPTS = _POLLS_LIMIT / _POLLS_TIMEOUT def get_destination_key(self, destination_wrapper, relative_path): return StorageOperations.normalize_path(destination_wrapper, relative_path) def get_destination_size(self, destination_wrapper, destination_key): return destination_wrapper.get_list_manager().get_file_size(destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return destination_wrapper.get_list_manager().get_file_object_head(destination_key) def get_source_key(self, source_wrapper, source_path): return source_path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=(), io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): full_path = path destination_path = self.get_destination_key(destination_wrapper, relative_path) source_service = AzureBucketOperations.get_blob_service(source_wrapper.bucket, read=True, write=clean) source_credentials = source_service.credentials source_blob_url = self.service.make_blob_url(source_wrapper.bucket.path, full_path, sas_token=source_credentials.session_token.lstrip('?')) destination_bucket = destination_wrapper.bucket.path sync_copy = size < TransferBetweenAzureBucketsManager._SYNC_COPY_SIZE_LIMIT if not size or size == 0: sync_copy = None progress_callback = AzureProgressPercentage.callback(full_path, size, quiet, lock) if progress_callback: progress_callback(0, size) self.events.put_all([DataAccessEvent(full_path, DataAccessType.READ, storage=source_wrapper.bucket), DataAccessEvent(destination_path, DataAccessType.WRITE, storage=destination_wrapper.bucket)]) self.service.copy_blob(destination_bucket, destination_path, source_blob_url, requires_sync=sync_copy) if not sync_copy: self._wait_for_copying(destination_bucket, destination_path, full_path) if progress_callback: progress_callback(size, size) if clean: self.events.put(DataAccessEvent(full_path, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_service.delete_blob(source_wrapper.bucket.path, full_path) return TransferResult(source_key=full_path, destination_key=destination_path, destination_version=None, tags=StorageOperations.parse_tags(tags)) def _wait_for_copying(self, destination_bucket, destination_path, full_path): for _ in range(0, TransferBetweenAzureBucketsManager._POLLS_ATTEMPTS): time.sleep(TransferBetweenAzureBucketsManager._POLLS_TIMEOUT) copying_status = self._get_copying_status(destination_bucket, destination_path) if copying_status in TransferBetweenAzureBucketsManager._COPY_TERMINAL_STATUSES: if copying_status == TransferBetweenAzureBucketsManager._COPY_SUCCESS_STATUS: return else: raise RuntimeError('Blob copying from %s to %s has failed.' % (full_path, destination_path)) raise RuntimeError('Blob copying from %s to %s has failed.' % (full_path, destination_path)) def _get_copying_status(self, destination_bucket, destination_path): blob = self.service.get_blob_properties(destination_bucket, destination_path) return blob.properties.copy.status class AzureDownloadManager(AzureManager, AbstractTransferManager): def get_destination_key(self, destination_wrapper, relative_path): if destination_wrapper.path.endswith(os.path.sep): return os.path.join(destination_wrapper.path, relative_path) else: return destination_wrapper.path def get_destination_size(self, destination_wrapper, destination_key): return StorageOperations.get_local_file_size(destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return StorageOperations.get_local_file_size(destination_key), \ StorageOperations.get_local_file_modification_datetime(destination_key) def get_source_key(self, source_wrapper, source_path): return source_path or source_wrapper.path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=None, io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) self.create_local_folder(destination_key, lock) progress_callback = AzureProgressPercentage.callback(source_key, size, quiet, lock) self.events.put(DataAccessEvent(source_key, DataAccessType.READ, storage=source_wrapper.bucket)) self.service.get_blob_to_path(source_wrapper.bucket.path, source_key, to_string(destination_key), progress_callback=progress_callback) if clean: self.events.put(DataAccessEvent(source_key, DataAccessType.DELETE, storage=source_wrapper.bucket)) self.service.delete_blob(source_wrapper.bucket.path, source_key) class AzureUploadManager(AzureManager, AbstractTransferManager): def get_destination_key(self, destination_wrapper, relative_path): return StorageOperations.normalize_path(destination_wrapper, relative_path) def get_destination_size(self, destination_wrapper, destination_key): return destination_wrapper.get_list_manager().get_file_size(destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return destination_wrapper.get_list_manager().get_file_object_head(destination_key) def get_source_key(self, source_wrapper, source_path): if source_path: return os.path.join(source_wrapper.path, source_path) else: return source_wrapper.path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=(), io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) destination_tags = StorageOperations.generate_tags(tags, source_key) progress_callback = AzureProgressPercentage.callback(relative_path, size, quiet, lock) max_connections = self.get_max_connections(io_threads) self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) self.service.create_blob_from_path(destination_wrapper.bucket.path, destination_key, to_string(source_key), metadata=destination_tags, progress_callback=progress_callback, max_connections=max_connections) if clean: source_wrapper.delete_item(source_key) return UploadResult(source_key=source_key, destination_key=destination_key, destination_version=None, tags=destination_tags) class _SourceUrlIO(io.BytesIO): def __init__(self, url): super(_SourceUrlIO, self).__init__() self.io = urlopen(url) def read(self, n=10): return self.io.read(n) class TransferFromHttpOrFtpToAzureManager(AzureManager, AbstractTransferManager): def get_destination_key(self, destination_wrapper, relative_path): if destination_wrapper.path.endswith(os.path.sep): return os.path.join(destination_wrapper.path, relative_path) else: return destination_wrapper.path def get_destination_size(self, destination_wrapper, destination_key): return destination_wrapper.get_list_manager().get_file_size(destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return destination_wrapper.get_list_manager().get_file_object_head(destination_key) def get_source_key(self, source_wrapper, source_path): return source_path or source_wrapper.path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=(), io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): if clean: raise AttributeError('Cannot perform \'mv\' operation due to deletion remote files ' 'is not supported for ftp/http sources.') source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) destination_tags = StorageOperations.generate_tags(tags, source_key) progress_callback = AzureProgressPercentage.callback(relative_path, size, quiet, lock) max_connections = self.get_max_connections(io_threads) self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) self.service.create_blob_from_stream(destination_wrapper.bucket.path, destination_key, _SourceUrlIO(source_key), metadata=destination_tags, progress_callback=progress_callback, max_connections=max_connections) return UploadResult(source_key=source_key, destination_key=destination_key, destination_version=None, tags=destination_tags) class AzureTemporaryCredentials: AZURE_STORAGE_ACCOUNT = 'AZURE_STORAGE_ACCOUNT' AZURE_STORAGE_KEY = 'AZURE_STORAGE_KEY' SAS_TOKEN = 'SAS_TOKEN' @classmethod def from_azure_sdk(cls, bucket, read, write): storage_account = os.environ[AzureTemporaryCredentials.AZURE_STORAGE_ACCOUNT] if AzureTemporaryCredentials.SAS_TOKEN not in os.environ: storage_account_key = os.environ[AzureTemporaryCredentials.AZURE_STORAGE_KEY] client = BlockBlobService(account_name=storage_account, account_key=storage_account_key) generation_date = datetime.utcnow() expiration_date = generation_date + timedelta(hours=1) print('SAS token generation date: %s' % generation_date) print('SAS token expiration date: %s' % expiration_date) permission = ContainerPermissions(True, False, False, True) sas_token = client.generate_account_shared_access_signature('sco', permission=permission, expiry=expiration_date, start=generation_date) else: sas_token = os.environ[AzureTemporaryCredentials.SAS_TOKEN] expiration_date = datetime.utcnow() + timedelta(hours=1) credentials = TemporaryCredentialsModel() credentials.region = "eu-central-1" credentials.access_key_id = None credentials.secret_key = storage_account credentials.session_token = sas_token credentials.expiration = expiration_date return credentials @classmethod def from_cp_api(cls, bucket, read, write): return DataStorage.get_single_temporary_credentials(bucket=bucket.identifier, read=read, write=write) class RefreshingBlockBlobService(BlockBlobService): def __init__(self, bucket, read, write, refresh_timeout=15, refresh_credentials=AzureTemporaryCredentials.from_cp_api): self.refresh_timeout = refresh_timeout self.refresh_credentials = lambda: refresh_credentials(bucket, read, write) self.credentials = self.refresh_credentials() self.refresh_credentials_lock = Lock() super(RefreshingBlockBlobService, self).__init__(account_name=self.credentials.secret_key, sas_token=self.credentials.session_token) def _perform_request(self, request, parser=None, parser_args=None, operation_context=None, expected_errors=None): with self.refresh_credentials_lock: if self._expired(self.credentials): self.credentials = self.refresh_credentials() self.sas_token = self.credentials.session_token self.authentication = _StorageSASAuthentication(self.credentials.session_token) return super(RefreshingBlockBlobService, self)._perform_request(request, parser, parser_args, operation_context, expected_errors) def _expired(self, credentials): return credentials.expiration - datetime.utcnow() < timedelta(minutes=self.refresh_timeout) class ProxyBlockBlobService(RefreshingBlockBlobService): def _apply_host(self, request, operation_context, retry_context): super(ProxyBlockBlobService, self)._apply_host(request, operation_context, retry_context) request_url = self.protocol + '://' + request.host self._httpclient.proxies = StorageOperations.get_proxy_config(request_url) class AzureBucketOperations: @classmethod def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(destination_wrapper.bucket, read=True, write=True) return TransferBetweenAzureBucketsManager(blob_service, events) @classmethod def get_download_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(source_wrapper.bucket, read=True, write=command == 'mv') return AzureDownloadManager(blob_service, events) @classmethod def get_upload_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(destination_wrapper.bucket, read=True, write=True) return AzureUploadManager(blob_service, events) @classmethod def get_transfer_from_http_or_ftp_manager(cls, source_wrapper, destination_wrapper, events, command): blob_service = cls.get_blob_service(destination_wrapper.bucket, read=True, write=True) return TransferFromHttpOrFtpToAzureManager(blob_service, events) @classmethod def get_blob_service(cls, *args, **kwargs): return ProxyBlockBlobService(*args, **kwargs)