pipe-cli/src/utilities/storage/s3.py (1,199 lines of code) (raw):

# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from boto3.s3.transfer import TransferConfig from botocore.endpoint import BotocoreHTTPSession, MAX_POOL_CONNECTIONS from treelib import Tree from src.model.datastorage_usage_model import StorageUsage from src.utilities.audit import DataAccessEvent, DataAccessType from src.utilities.datastorage_lifecycle_manager import DataStorageLifecycleManager from src.utilities.encoding_utilities import to_string from src.utilities.storage.s3_proxy_utils import AwsProxyConnectWithHeadersHTTPSAdapter from src.utilities.storage.storage_usage import StorageUsageAccumulator from src.utilities.storage.s3_checksum import ChecksumProcessor import collections import os from boto3 import Session from botocore.config import Config as AwsConfig from botocore.credentials import RefreshableCredentials, Credentials from botocore.exceptions import ClientError from botocore.session import get_session from s3transfer.manager import TransferManager from s3transfer import upload, tasks, copies from src.api.data_storage import DataStorage from src.model.data_storage_item_model import DataStorageItemModel, DataStorageItemLabelModel from src.utilities.debug_utils import debug_log_proxies from src.utilities.patterns import PatternMatcher from src.utilities.progress_bar import ProgressPercentage from src.utilities.storage.common import StorageOperations, AbstractListingManager, AbstractDeleteManager, \ AbstractRestoreManager, AbstractTransferManager, TransferResult, UploadResult from src.config import Config import requests requests.urllib3.disable_warnings() import botocore.vendored.requests.packages.urllib3 as boto_urllib3 boto_urllib3.disable_warnings() class UploadedObjectsContainer: def __init__(self): self._objects = {} def add(self, bucket, key, data): self._objects[bucket + '/' + key] = data def pop(self, bucket, key): data = self._objects[bucket + '/' + key] del self._objects[bucket + '/' + key] return data uploaded_objects_container = UploadedObjectsContainer() checksum_processor = ChecksumProcessor() def _upload_part_task_main(self, client, fileobj, bucket, key, upload_id, part_number, extra_args): if checksum_processor.enabled: extra_args['ContentMD5'] = '' with fileobj as body: response = client.upload_part( Bucket=bucket, Key=key, UploadId=upload_id, PartNumber=part_number, Body=body, **extra_args) result = {'PartNumber': part_number, 'ETag': response['ETag']} if checksum_processor.enabled and not checksum_processor.skip: result.update({checksum_processor.boto_field: response['ResponseMetadata']['HTTPHeaders'][checksum_processor.checksum_header]}) return result def _put_object_task_main(self, client, fileobj, bucket, key, extra_args): if checksum_processor.enabled: extra_args['ContentMD5'] = '' with fileobj as body: output = client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args) uploaded_objects_container.add(bucket, key, output.get('VersionId')) def _complete_multipart_upload_task_main(self, client, bucket, key, upload_id, parts, extra_args): output = client.complete_multipart_upload( Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts}, **extra_args) uploaded_objects_container.add(bucket, key, output.get('VersionId')) def _copy_object_task_main(self, client, copy_source, bucket, key, extra_args, callbacks, size): output = client.copy_object( CopySource=copy_source, Bucket=bucket, Key=key, **extra_args) for callback in callbacks: callback(bytes_transferred=size) uploaded_objects_container.add(bucket, key, output.get('VersionId')) # By default boto library doesn't aggregate uploaded object versions # which have to be downloaded via an extra request for each individual object. # This monkey patching allows to aggregate uploaded object versions # and use them later on without any extra requests being performed. upload.PutObjectTask._main = _put_object_task_main upload.UploadPartTask._main = _upload_part_task_main tasks.CompleteMultipartUploadTask._main = _complete_multipart_upload_task_main copies.CopyObjectTask._main = _copy_object_task_main class StorageItemManager(object): def __init__(self, session, events=None, bucket=None, region_name=None, cross_region=False, endpoint=None): self.session = session self.events = events self.region_name = region_name self.endpoint = endpoint _boto_config = S3BucketOperations.get_boto_config(cross_region=cross_region) self.s3 = session.resource('s3', config=_boto_config, region_name=self.region_name, endpoint_url=endpoint, verify=False if endpoint else None) self.s3.meta.client._endpoint.http_session = BotocoreHTTPSession( max_pool_connections=MAX_POOL_CONNECTIONS, http_adapter_cls=AwsProxyConnectWithHeadersHTTPSAdapter) if bucket: self.bucket = self.s3.Bucket(bucket) debug_log_proxies(_boto_config) @classmethod def show_progress(cls, quiet, size, lock=None): return StorageOperations.show_progress(quiet, size, lock) @classmethod def _convert_tags_to_url_string(cls, tags): if not tags: return tags tags = StorageOperations.preprocess_tags(tags) return '&'.join(['%s=%s' % (key, value) for key, value in tags.items()]) def _get_client(self): _boto_config = S3BucketOperations.get_boto_config() client = self.session.client('s3', config=_boto_config, region_name=self.region_name, endpoint_url=self.endpoint, verify=False if self.endpoint else None) client._endpoint.http_session.adapters['https://'] = BotocoreHTTPSession( max_pool_connections=MAX_POOL_CONNECTIONS, http_adapter_cls=AwsProxyConnectWithHeadersHTTPSAdapter) debug_log_proxies(_boto_config) return client def get_s3_file_size(self, bucket, key): try: client = self._get_client() item = client.head_object(Bucket=bucket, Key=key) if 'DeleteMarker' in item: return None if 'ContentLength' in item: return int(item['ContentLength']) return None except ClientError: return None def get_s3_file_object_head(self, bucket, key): try: client = self._get_client() item = client.head_object(Bucket=bucket, Key=key) if 'DeleteMarker' in item: return None, None return int(item['ContentLength']), item.get('LastModified', None) except ClientError: return None, None def get_s3_file_version(self, bucket, key): try: client = self._get_client() item = client.head_object(Bucket=bucket, Key=key) if 'DeleteMarker' in item: return None return item.get('VersionId') except ClientError: return None def get_uploaded_s3_file_version(self, bucket, key): return uploaded_objects_container.pop(bucket, key) @staticmethod def get_local_file_size(path): return StorageOperations.get_local_file_size(path) @staticmethod def get_local_modification_datetime(path): return StorageOperations.get_local_file_modification_datetime(path) def get_transfer_config(self, io_threads): transfer_config = TransferConfig() if io_threads is not None: transfer_config.max_concurrency = max(io_threads, 1) transfer_config.use_threads = transfer_config.max_concurrency > 1 return transfer_config class DownloadManager(StorageItemManager, AbstractTransferManager): def __init__(self, session, bucket, events, region_name=None, endpoint=None): super(DownloadManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name, endpoint=endpoint) def get_destination_key(self, destination_wrapper, relative_path): if destination_wrapper.path.endswith(os.path.sep): return os.path.join(destination_wrapper.path, relative_path) else: return destination_wrapper.path def get_source_key(self, source_wrapper, path): return path or source_wrapper.path def get_destination_size(self, destination_wrapper, destination_key): return self.get_local_file_size(destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return self.get_local_file_size(destination_key), \ StorageOperations.get_local_file_modification_datetime(destination_key) def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=None, io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) self.create_local_folder(destination_key, lock) transfer_config = self.get_transfer_config(io_threads) if StorageItemManager.show_progress(quiet, size, lock): progress_callback = ProgressPercentage(relative_path, size) else: progress_callback = None self.events.put(DataAccessEvent(source_key, DataAccessType.READ, storage=source_wrapper.bucket)) self.bucket.download_file(source_key, to_string(destination_key), Callback=progress_callback, Config=transfer_config) if clean: self.events.put(DataAccessEvent(source_key, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_wrapper.delete_item(source_key) class DownloadStreamManager(StorageItemManager, AbstractTransferManager): def __init__(self, session, bucket, events, region_name=None, endpoint=None): super(DownloadStreamManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name, endpoint=endpoint) def get_destination_key(self, destination_wrapper, relative_path): return destination_wrapper.path def get_source_key(self, source_wrapper, path): return path or source_wrapper.path def get_destination_size(self, destination_wrapper, destination_key): return 0 def get_destination_object_head(self, destination_wrapper, destination_key): return 0, None def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=None, io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) transfer_config = self.get_transfer_config(io_threads) if StorageItemManager.show_progress(quiet, size, lock): progress_callback = ProgressPercentage(relative_path, size) else: progress_callback = None self.events.put(DataAccessEvent(source_key, DataAccessType.READ, storage=source_wrapper.bucket)) self.bucket.download_fileobj(source_key, destination_wrapper.get_output_stream(destination_key), Callback=progress_callback, Config=transfer_config) if clean: self.events.put(DataAccessEvent(source_key, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_wrapper.delete_item(source_key) class UploadManager(StorageItemManager, AbstractTransferManager): def __init__(self, session, bucket, events, region_name=None, endpoint=None): super(UploadManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name, endpoint=endpoint) def get_destination_key(self, destination_wrapper, relative_path): return S3BucketOperations.normalize_s3_path(destination_wrapper, relative_path) def get_destination_size(self, destination_wrapper, destination_key): return self.get_s3_file_size(destination_wrapper.bucket.path, destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return self.get_s3_file_object_head(destination_wrapper.bucket.path, destination_key) def get_source_key(self, source_wrapper, source_path): if source_path: return os.path.join(source_wrapper.path, source_path) else: return source_wrapper.path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=(), io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) tags = StorageOperations.generate_tags(tags, source_key) extra_args = { 'Tagging': self._convert_tags_to_url_string(tags), 'ACL': 'bucket-owner-full-control' } TransferManager.ALLOWED_UPLOAD_ARGS.append('Tagging') transfer_config = self.get_transfer_config(io_threads) if StorageItemManager.show_progress(quiet, size, lock): progress_callback = ProgressPercentage(relative_path, size) else: progress_callback = None self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) if checksum_algorithm: checksum_processor.init(checksum_algorithm, checksum_skip) checksum_processor.prepare_boto_client(self.s3.meta.client) self.bucket.upload_file(to_string(source_key), destination_key, Callback=progress_callback, Config=transfer_config, ExtraArgs=extra_args) if clean: source_wrapper.delete_item(source_key) version = self.get_uploaded_s3_file_version(destination_wrapper.bucket.path, destination_key) return UploadResult(source_key=source_key, destination_key=destination_key, destination_version=version, tags=tags) class UploadStreamManager(StorageItemManager, AbstractTransferManager): def __init__(self, session, bucket, events, region_name=None, endpoint=None): super(UploadStreamManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name, endpoint=endpoint) def get_destination_key(self, destination_wrapper, relative_path): return S3BucketOperations.normalize_s3_path(destination_wrapper, relative_path) def get_destination_size(self, destination_wrapper, destination_key): return self.get_s3_file_size(destination_wrapper.bucket.path, destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return self.get_s3_file_object_head(destination_wrapper.bucket.path, destination_key) def get_source_key(self, source_wrapper, source_path): return source_path or source_wrapper.path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=(), io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): source_key = self.get_source_key(source_wrapper, path) destination_key = self.get_destination_key(destination_wrapper, relative_path) tags = StorageOperations.generate_tags(tags, source_key) extra_args = { 'Tagging': self._convert_tags_to_url_string(tags), 'ACL': 'bucket-owner-full-control' } TransferManager.ALLOWED_UPLOAD_ARGS.append('Tagging') transfer_config = self.get_transfer_config(io_threads) if StorageItemManager.show_progress(quiet, size, lock): progress_callback = ProgressPercentage(relative_path, size) else: progress_callback = None self.events.put(DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)) self.bucket.upload_fileobj(source_wrapper.get_input_stream(source_key), destination_key, Callback=progress_callback, Config=transfer_config, ExtraArgs=extra_args) version = self.get_uploaded_s3_file_version(destination_wrapper.bucket.path, destination_key) return UploadResult(source_key=source_key, destination_key=destination_key, destination_version=version, tags=tags) class TransferBetweenBucketsManager(StorageItemManager, AbstractTransferManager): def __init__(self, session, bucket, events, region_name=None, cross_region=False, endpoint=None): self.cross_region = cross_region super(TransferBetweenBucketsManager, self).__init__(session, events=events, bucket=bucket, region_name=region_name, cross_region=cross_region, endpoint=endpoint) def get_destination_key(self, destination_wrapper, relative_path): return S3BucketOperations.normalize_s3_path(destination_wrapper, relative_path) def get_destination_size(self, destination_wrapper, destination_key): return self.get_s3_file_size(destination_wrapper.bucket.path, destination_key) def get_destination_object_head(self, destination_wrapper, destination_key): return self.get_s3_file_object_head(destination_wrapper.bucket.path, destination_key) def get_source_key(self, source_wrapper, source_path): return source_path def transfer(self, source_wrapper, destination_wrapper, path=None, relative_path=None, clean=False, quiet=False, size=None, tags=(), io_threads=None, lock=None, checksum_algorithm='md5', checksum_skip=False): # checked is bucket and file source_bucket = source_wrapper.bucket.path source_region = source_wrapper.bucket.region source_endpoint = source_wrapper.bucket.endpoint destination_key = self.get_destination_key(destination_wrapper, relative_path) copy_source = { 'Bucket': source_bucket, 'Key': path } source_client = self.build_source_client(source_region, source_endpoint) extra_args = { 'ACL': 'bucket-owner-full-control' } self.events.put_all([DataAccessEvent(path, DataAccessType.READ, storage=source_wrapper.bucket), DataAccessEvent(destination_key, DataAccessType.WRITE, storage=destination_wrapper.bucket)]) if StorageItemManager.show_progress(quiet, size, lock): self.bucket.copy(copy_source, destination_key, Callback=ProgressPercentage(relative_path, size), ExtraArgs=extra_args, SourceClient=source_client) else: self.bucket.copy(copy_source, destination_key, ExtraArgs=extra_args, SourceClient=source_client) if clean: self.events.put(DataAccessEvent(path, DataAccessType.DELETE, storage=source_wrapper.bucket)) source_wrapper.delete_item(path) version = self.get_uploaded_s3_file_version(destination_wrapper.bucket.path, destination_key) return TransferResult(source_key=path, destination_key=destination_key, destination_version=version, tags=StorageOperations.parse_tags(tags)) def build_source_client(self, source_region, source_endpoint): _boto_config = S3BucketOperations.get_boto_config(cross_region=self.cross_region) source_s3 = self.session.resource('s3', config=_boto_config, region_name=source_region, endpoint_url=source_endpoint, verify=False if source_endpoint else None) source_s3.meta.client._endpoint.http_session = BotocoreHTTPSession( max_pool_connections=MAX_POOL_CONNECTIONS, http_adapter_cls=AwsProxyConnectWithHeadersHTTPSAdapter) debug_log_proxies(_boto_config) return source_s3.meta.client @classmethod def has_required_tag(cls, tags, tag_name): for tag in tags: if tag.startswith(tag_name): return True return False @classmethod def convert_object_tags(cls, object_tags): tags = () for tag in object_tags: if 'Key' in tag and 'Value' in tag: tags += ('{}={}'.format(tag['Key'], tag['Value']),) return tags class RestoreManager(StorageItemManager, AbstractRestoreManager): VERSION_NOT_EXISTS_ERROR = 'Version "%s" doesn\'t exist.' def __init__(self, bucket, session, events, region_name=None, endpoint=None): super(RestoreManager, self).__init__(session, events=events, region_name=region_name, endpoint=endpoint) self.bucket = bucket self.listing_manager = bucket.get_list_manager(True) def restore_version(self, version, exclude=[], include=[], recursive=False): client = self._get_client() bucket = self.bucket.bucket.path file_items = self._list_file_items() if not recursive or self._has_deleted_file(file_items): if not version: version = self._find_last_version(self.bucket.path, file_items) self.restore_file_version(version, bucket, client, file_items) return self.restore_folder(bucket, client, exclude, include, recursive) def restore_file_version(self, version, bucket, client, file_items): relative_path = self.bucket.path self._validate_version(bucket, client, version, file_items) try: self.events.put(DataAccessEvent(relative_path, DataAccessType.WRITE, storage=self.bucket.bucket)) copied_object = client.copy_object(Bucket=bucket, Key=relative_path, CopySource=dict(Bucket=bucket, Key=relative_path, VersionId=version)) client.delete_objects(Bucket=bucket, Delete=dict(Objects=[dict(Key=relative_path, VersionId=version)])) DataStorage.batch_copy_object_tags(self.bucket.bucket.identifier, [{ 'source': { 'path': relative_path, 'version': version }, 'destination': { 'path': relative_path } }, { 'source': { 'path': relative_path, 'version': version }, 'destination': { 'path': relative_path, 'version': copied_object['VersionId'] } }]) DataStorage.batch_delete_object_tags(self.bucket.bucket.identifier, [{ 'path': relative_path, 'version': version }]) except ClientError as e: error_message = str(e) if 'delete marker' in error_message: text = "Cannot restore a delete marker" elif 'Invalid version' in error_message: text = self.VERSION_NOT_EXISTS_ERROR % version else: text = error_message raise RuntimeError(text) def load_item(self, bucket, client): try: item = client.head_object(Bucket=bucket, Key=self.bucket.path) except ClientError as e: error_message = str(e) if 'Not Found' in error_message: return self.load_delete_marker(bucket, self.bucket.path, client) raise RuntimeError('Requested file "{}" doesn\'t exist. {}.'.format(self.bucket.path, error_message)) if item is None: raise RuntimeError('Path "{}" doesn\'t exist'.format(self.bucket.path)) return item def _list_file_items(self): relative_path = self.bucket.path all_items = self.listing_manager.list_items(relative_path, show_all=True) item_name = relative_path.split(S3BucketOperations.S3_PATH_SEPARATOR)[-1] return [item for item in all_items if item.type == 'File' and item.name == item_name] def load_delete_marker(self, bucket, path, client, quite=False): operation_parameters = { 'Bucket': bucket, 'Prefix': path } paginator = client.get_paginator('list_object_versions') pages = paginator.paginate(**operation_parameters) for page in pages: if 'Versions' not in page and 'DeleteMarkers' not in page: raise RuntimeError('Requested file "{}" doesn\'t exist.'.format(path)) if 'DeleteMarkers' not in page: continue for item in page['DeleteMarkers']: if 'IsLatest' in item and item['IsLatest'] and path == item['Key']: return item if not quite: raise RuntimeError('Latest file version is not deleted. Please specify "--version" parameter.') @staticmethod def _find_last_version(relative_path, file_items): if not file_items or not len(file_items): raise RuntimeError('Requested file "%s" doesn\'t exist.' % relative_path) item = file_items[0] if not item: raise RuntimeError('Failed to receive deleted marker') if not item.delete_marker: raise RuntimeError('Latest file version is not deleted. Please specify "--version" parameter.') versions = [item_version for item_version in item.versions if not item_version.delete_marker] if not versions or not len(versions): raise RuntimeError('Latest file version is not deleted. Please specify "--version" parameter.') version = versions[0].version if not version: raise RuntimeError('Failed to find last version') return version @staticmethod def _has_deleted_file(file_items): return file_items and len(file_items) and file_items[0] and file_items[0].delete_marker def restore_folder(self, bucket, client, exclude, include, recursive): delimiter = S3BucketOperations.S3_PATH_SEPARATOR path = self.bucket.path prefix = StorageOperations.get_prefix(path) if not prefix.endswith(delimiter): prefix += delimiter operation_parameters = { 'Bucket': bucket } if path: operation_parameters['Prefix'] = prefix if not recursive: operation_parameters['Delimiter'] = delimiter paginator = client.get_paginator('list_object_versions') pages = paginator.paginate(**operation_parameters) restore_items = [] for page in pages: S3BucketOperations.process_listing(page, 'DeleteMarkers', restore_items, delimiter, exclude, include, prefix, versions=True) restore_items, flushing_items = S3BucketOperations.split_by_aws_limit(restore_items) if flushing_items: self._restore_objects(client, bucket, flushing_items) if restore_items: self._restore_objects(client, bucket, restore_items) def _restore_objects(self, client, bucket, items): self.events.put_all([DataAccessEvent(item['Key'], DataAccessType.WRITE, storage=self.bucket.bucket) for item in items]) client.delete_objects(Bucket=bucket, Delete=dict(Objects=items)) def _validate_version(self, bucket, client, version, file_items): current_item = self.load_item(bucket, client) if current_item['VersionId'] == version: raise RuntimeError('Version "{}" is already the latest version'.format(version)) if not file_items: raise RuntimeError(self.VERSION_NOT_EXISTS_ERROR % version) item = file_items[0] if not any(item.version == version for item in item.versions): raise RuntimeError(self.VERSION_NOT_EXISTS_ERROR % version) class DeleteManager(StorageItemManager, AbstractDeleteManager): def __init__(self, bucket, session, events, region_name=None, endpoint=None): super(DeleteManager, self).__init__(session, events=events, region_name=region_name, endpoint=endpoint) self.bucket = bucket def delete_items(self, relative_path, recursive=False, exclude=[], include=[], version=None, hard_delete=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE): client = self._get_client() delimiter = S3BucketOperations.S3_PATH_SEPARATOR bucket = self.bucket.bucket.path prefix = StorageOperations.get_prefix(relative_path) if not recursive and not hard_delete: delete_items = [] if version is not None: delete_items.append(dict(Key=prefix, VersionId=version)) else: delete_items.append(dict(Key=prefix)) self.events.put(DataAccessEvent(prefix, DataAccessType.DELETE, storage=self.bucket.bucket)) client.delete_objects(Bucket=bucket, Delete=dict(Objects=delete_items)) if self.bucket.bucket.policy.versioning_enabled: if version: latest_version = self.get_s3_file_version(bucket, prefix) if latest_version: DataStorage.batch_copy_object_tags(self.bucket.bucket.identifier, [{ 'source': { 'path': relative_path, 'version': latest_version }, 'destination': { 'path': relative_path } }]) self._delete_object_tags(delete_items) else: delete_items.append(dict(Key=prefix)) self._delete_object_tags(delete_items) else: self._delete_object_tags(delete_items) else: operation_parameters = { 'Bucket': bucket, 'Prefix': prefix, 'PaginationConfig': { 'PageSize': page_size } } if hard_delete: paginator = client.get_paginator('list_object_versions') else: paginator = client.get_paginator('list_objects_v2') pages = paginator.paginate(**operation_parameters) delete_items = [] for page in pages: S3BucketOperations.process_listing(page, 'Contents', delete_items, delimiter, exclude, include, prefix) S3BucketOperations.process_listing(page, 'Versions', delete_items, delimiter, exclude, include, prefix, versions=True) S3BucketOperations.process_listing(page, 'DeleteMarkers', delete_items, delimiter, exclude, include, prefix, versions=True) delete_items, flushing_items = S3BucketOperations.split_by_aws_limit(delete_items) if flushing_items: self._delete_objects(client, bucket, hard_delete, flushing_items) if delete_items: self._delete_objects(client, bucket, hard_delete, delete_items) def _delete_objects(self, client, bucket, hard_delete, items): if not self.bucket.bucket.policy.versioning_enabled or hard_delete: self._delete_all_object_tags(items) self.events.put_all([DataAccessEvent(item['Key'], DataAccessType.DELETE, storage=self.bucket.bucket) for item in items]) client.delete_objects(Bucket=bucket, Delete=dict(Objects=items)) def _delete_all_object_tags(self, items, chunk_size=100): item_names = list(set(item['Key'] for item in items)) for item_names_chunk in [item_names[i:i + chunk_size] for i in range(0, len(item_names), chunk_size)]: DataStorage.batch_delete_all_object_tags(self.bucket.bucket.identifier, [{'path': item_name} for item_name in item_names_chunk]) def _delete_object_tags(self, items, chunk_size=100): for items_chunk in [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]: DataStorage.batch_delete_object_tags(self.bucket.bucket.identifier, [{'path': item['Key'], 'version': item.get('VersionId')} for item in items_chunk]) class ListingManager(StorageItemManager, AbstractListingManager): DEFAULT_PAGE_SIZE = StorageOperations.DEFAULT_PAGE_SIZE def __init__(self, bucket, session, show_versions=False, region_name=None, endpoint=None): super(ListingManager, self).__init__(session, region_name=region_name, endpoint=endpoint) self.bucket = bucket self.show_versions = show_versions def list_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, show_all=False, show_archive=False): delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = self._get_client() operation_parameters = { 'Bucket': self.bucket.bucket.path } if not show_all: operation_parameters['PaginationConfig'] = { 'MaxItems': page_size, 'PageSize': page_size } else: page_size = None if not recursive: operation_parameters['Delimiter'] = delimiter prefix = S3BucketOperations.get_prefix(delimiter, relative_path) if relative_path: operation_parameters['Prefix'] = prefix if self.show_versions: return self.list_versions(client, prefix, operation_parameters, recursive, page_size, show_archive) else: return self.list_objects(client, prefix, operation_parameters, recursive, page_size, show_archive) def list_paging_items(self, relative_path=None, recursive=False, page_size=StorageOperations.DEFAULT_PAGE_SIZE, start_token=None, show_archive=False): delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = self._get_client() operation_parameters = { 'Bucket': self.bucket.bucket.path, 'PaginationConfig': { 'PageSize': page_size, 'MaxItems': page_size, } } prefix = S3BucketOperations.get_prefix(delimiter, relative_path) if relative_path: operation_parameters['Prefix'] = prefix return self.list_paging_objects(client, prefix, operation_parameters, recursive, start_token, show_archive) def get_summary_with_depth(self, max_depth, relative_path=None): bucket_name = self.bucket.bucket.path delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = self._get_client() operation_parameters = { 'Bucket': bucket_name } prefix = S3BucketOperations.get_prefix(delimiter, relative_path) root_path = relative_path if relative_path: operation_parameters['Prefix'] = prefix prefix_tokens = prefix.split(delimiter) prefix_tokens_len = len(prefix_tokens) max_depth += prefix_tokens_len root_path = '' if prefix_tokens_len == 1 else delimiter.join(prefix_tokens[:-1]) paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) accumulator = StorageUsageAccumulator(bucket_name, root_path, delimiter, max_depth) for page in page_iterator: if 'Contents' in page: for file in page['Contents']: name = self.get_file_name(file, prefix, True) size = file['Size'] tier = file['StorageClass'] accumulator.add_path(name, tier, size) if not page['IsTruncated']: break result_tree = accumulator.get_tree() if relative_path and root_path != relative_path and not relative_path.endswith(delimiter): root_path = delimiter.join([bucket_name, root_path]) if root_path else bucket_name result_tree[root_path].data = None return result_tree def get_listing_with_depth(self, max_depth, relative_path=None): bucket_name = self.bucket.bucket.path client = self._get_client() delimiter = S3BucketOperations.S3_PATH_SEPARATOR operation_parameters = { 'Bucket': bucket_name, 'Delimiter': delimiter } prefix = S3BucketOperations.get_prefix(delimiter, relative_path) if relative_path: if relative_path.endswith(delimiter): yield relative_path operation_parameters['Prefix'] = prefix paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) for page in page_iterator: if 'CommonPrefixes' not in page: return for folder in page.get('CommonPrefixes'): for item in self._list_folders(S3BucketOperations.get_prefix(delimiter, folder['Prefix']), delimiter, 1, max_depth, operation_parameters, client): yield item else: for item in self._list_folders(prefix, delimiter, 1, max_depth, operation_parameters, client): yield item def _list_folders(self, prefix, delimiter, current_depth, max_depth, operation_parameters, client): if current_depth > max_depth: return if prefix != delimiter: operation_parameters['Prefix'] = prefix yield prefix else: yield '' paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) for page in page_iterator: if 'CommonPrefixes' not in page: return for folder in page.get('CommonPrefixes'): folder_prefix = S3BucketOperations.get_prefix(delimiter, folder['Prefix']) if current_depth == max_depth: yield folder_prefix else: for item in self._list_folders(folder_prefix, delimiter, current_depth + 1, max_depth, operation_parameters, client): yield item def get_summary(self, relative_path=None): delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = self._get_client() operation_parameters = { 'Bucket': self.bucket.bucket.path } prefix = S3BucketOperations.get_prefix(delimiter, relative_path) if relative_path: operation_parameters['Prefix'] = prefix paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) storage_usage = StorageUsage() for page in page_iterator: if 'Contents' in page: for file in page['Contents']: if self.prefix_match(file, relative_path): storage_usage.add_item(file["StorageClass"], file['Size']) if not page['IsTruncated']: break return delimiter.join([self.bucket.bucket.path, relative_path]), storage_usage @classmethod def prefix_match(cls, page_file, relative_path=None): if not relative_path: return True if 'Key' not in page_file or not page_file['Key']: return False key = page_file['Key'] if key == relative_path: return True if relative_path.endswith(S3BucketOperations.S3_PATH_SEPARATOR): return True if key.startswith("%s%s" % (relative_path, S3BucketOperations.S3_PATH_SEPARATOR)): return True return False def list_versions(self, client, prefix, operation_parameters, recursive, page_size, show_archive): paginator = client.get_paginator('list_object_versions') page_iterator = paginator.paginate(**operation_parameters) items = [] item_keys = collections.OrderedDict() items_count = 0 lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag) for page in page_iterator: if 'CommonPrefixes' in page: for folder in page['CommonPrefixes']: name = S3BucketOperations.get_item_name(folder['Prefix'], prefix=prefix) items.append(self.get_folder_object(name)) items_count += 1 if 'Versions' in page: for version in page['Versions']: name = self.get_file_name(version, prefix, recursive) restore_status = None if version['StorageClass'] != 'STANDARD' and version['StorageClass'] != 'INTELLIGENT_TIERING': restore_status, versions_restored = lifecycle_manager.find_lifecycle_status(name) version_not_restored = restore_status and not version['IsLatest'] and not versions_restored if not show_archive: if not restore_status or version_not_restored: continue else: if version_not_restored: restore_status = None item = self.get_file_object(version, name, version=True, lifecycle_status=restore_status) self.process_version(item, item_keys, name) if 'DeleteMarkers' in page: for delete_marker in page['DeleteMarkers']: name = self.get_file_name(delete_marker, prefix, recursive) item = self.get_file_object(delete_marker, name, version=True, storage_class=False) item.delete_marker = True self.process_version(item, item_keys, name) items_count += len(item_keys) if self.need_to_stop_paging(page, page_size, items_count): break items.extend(item_keys.values()) for item in items: item.versions.sort(key=lambda x: x.changed, reverse=True) return items def process_version(self, item, item_keys, name): if name not in item_keys: item.versions = [item] item_keys[name] = item else: previous = item_keys[name] versions = previous.versions versions.append(item) if item.latest: item.versions = versions item_keys[name] = item def list_objects(self, client, prefix, operation_parameters, recursive, page_size, show_archive): paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) items = [] items_count = 0 lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag) for page in page_iterator: if 'CommonPrefixes' in page: for folder in page['CommonPrefixes']: name = S3BucketOperations.get_item_name(folder['Prefix'], prefix=prefix) items.append(self.get_folder_object(name)) items_count += 1 if 'Contents' in page: for file in page['Contents']: name = self.get_file_name(file, prefix, recursive) lifecycle_status = None if file['StorageClass'] != 'STANDARD' and file['StorageClass'] != 'INTELLIGENT_TIERING': lifecycle_status, _ = lifecycle_manager.find_lifecycle_status(name) if not show_archive and not lifecycle_status: continue item = self.get_file_object(file, name, lifecycle_status=lifecycle_status) items.append(item) items_count += 1 if self.need_to_stop_paging(page, page_size, items_count): break return items def list_paging_objects(self, client, prefix, operation_parameters, recursive, start_token, show_archive): if start_token: operation_parameters['ContinuationToken'] = start_token paginator = client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(**operation_parameters) lifecycle_manager = DataStorageLifecycleManager(self.bucket.bucket.identifier, prefix, self.bucket.is_file_flag) items = [] for page in page_iterator: if 'CommonPrefixes' in page: for folder in page['CommonPrefixes']: name = S3BucketOperations.get_item_name(folder['Prefix'], prefix=prefix) items.append(self.get_folder_object(name)) if 'Contents' in page: for file in page['Contents']: name = self.get_file_name(file, prefix, recursive) lifecycle_status = None if file['StorageClass'] != 'STANDARD' and file['StorageClass'] != 'INTELLIGENT_TIERING': lifecycle_status, _ = lifecycle_manager.find_lifecycle_status(name) if not show_archive and not lifecycle_status: continue item = self.get_file_object(file, name, lifecycle_status=lifecycle_status) items.append(item) return items, page.get('NextContinuationToken', None) if page else None def get_file_object(self, file, name, version=False, storage_class=True, lifecycle_status=None): item = DataStorageItemModel() item.type = 'File' item.name = name if 'Size' in file: item.size = file['Size'] item.path = name item.changed = file['LastModified'].astimezone(Config.instance().timezone()) if storage_class: lifecycle_status = lifecycle_status if lifecycle_status else '' item.labels = [DataStorageItemLabelModel('StorageClass', file['StorageClass'] + lifecycle_status)] if version: item.version = file['VersionId'] item.latest = file['IsLatest'] return item def get_file_name(self, file, prefix, recursive): if recursive: name = file['Key'] else: name = S3BucketOperations.get_item_name(file['Key'], prefix=prefix) return name def get_folder_object(self, name): item = DataStorageItemModel() item.type = 'Folder' item.name = name item.path = name return item def get_items(self, relative_path): return S3BucketOperations.get_items(self.bucket, session=self.session) def get_file_tags(self, relative_path): return ObjectTaggingManager.get_object_tagging(ObjectTaggingManager( self.session, self.bucket, self.region_name, endpoint=self.endpoint), relative_path) @staticmethod def need_to_stop_paging(page, page_size, items_count): if 'IsTruncated' in page and not page['IsTruncated']: return True if page_size and items_count >= page_size: return True return False class ObjectTaggingManager(StorageItemManager): def __init__(self, session, bucket, region_name=None, endpoint=None): super(ObjectTaggingManager, self).__init__(session, region_name=region_name, endpoint=endpoint) self.bucket = bucket def get_object_tagging(self, source): client = self._get_client() response = client.get_object_tagging( Bucket=self.bucket, Key=source ) return response['TagSet'] class S3BucketOperations(object): S3_ENDPOINT_URL = 'https://s3.amazonaws.com' S3_PATH_SEPARATOR = StorageOperations.PATH_SEPARATOR S3_REQUEST_ELEMENTS_LIMIT = 1000 __config__ = None @classmethod def get_boto_config(cls, cross_region=False): max_attempts = os.getenv('CP_AWS_MAX_ATTEMPTS') retries = {'max_attempts': int(max_attempts)} if max_attempts else None if cls.__config__ is None: cls.__config__ = Config.instance() if cls.__config__.proxy is None: if cross_region: os.environ['no_proxy'] = '' return AwsConfig(retries=retries) else: return AwsConfig(proxies=cls.__config__.resolve_proxy(target_url=cls.S3_ENDPOINT_URL), retries=retries) @classmethod def _get_client(cls, session, region_name=None, endpoint=None): _boto_config = S3BucketOperations.get_boto_config() client = session.client('s3', config=_boto_config, region_name=region_name, endpoint_url=endpoint, verify=False if endpoint else None) client._endpoint.http_session.adapters['https://'] = BotocoreHTTPSession( max_pool_connections=MAX_POOL_CONNECTIONS, http_adapter_cls=AwsProxyConnectWithHeadersHTTPSAdapter) debug_log_proxies(_boto_config) return client @classmethod def init_wrapper(cls, storage_wrapper, session=None, versioning=False): if storage_wrapper.is_local(): return storage_wrapper delimiter = S3BucketOperations.S3_PATH_SEPARATOR prefix = cls.get_prefix(delimiter, storage_wrapper.path) check_file = True if prefix.endswith(cls.S3_PATH_SEPARATOR): prefix = prefix[:-1] check_file = False if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp', versioning=versioning) storage_wrapper.session = session client = cls._get_client(session, region_name=storage_wrapper.bucket.region, endpoint=storage_wrapper.bucket.endpoint) if versioning: paginator = client.get_paginator('list_object_versions') else: paginator = client.get_paginator('list_objects_v2') operation_parameters = { 'Bucket': storage_wrapper.bucket.path, 'Prefix': prefix, 'Delimiter': delimiter, 'PaginationConfig': { 'MaxItems': 5, 'PageSize': 5 } } page_iterator = paginator.paginate(**operation_parameters) for page in page_iterator: if check_file: if cls.check_section(storage_wrapper, prefix, page, 'DeleteMarkers'): break if cls.check_section(storage_wrapper, prefix, page, 'Versions'): break if cls.check_section(storage_wrapper, prefix, page, 'Contents'): break if 'CommonPrefixes' in page: for page_info in page['CommonPrefixes']: if 'Prefix' in page_info and page_info['Prefix'] == prefix + cls.S3_PATH_SEPARATOR: storage_wrapper.exists_flag = True storage_wrapper.is_file_flag = False storage_wrapper.is_empty_flag = False break return storage_wrapper @classmethod def check_section(cls, storage_wrapper, prefix, page, section): if section in page: for page_info in page[section]: if 'Key' in page_info and page_info['Key'] == prefix: storage_wrapper.exists_flag = True storage_wrapper.is_file_flag = True return True else: return False @classmethod def get_prefix(cls, delimiter, path): return StorageOperations.get_prefix(path, delimiter=delimiter) @classmethod def get_item_name(cls, param, prefix=None): return StorageOperations.get_item_name(param, prefix) @classmethod def get_items(cls, storage_wrapper, session=None): if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') delimiter = S3BucketOperations.S3_PATH_SEPARATOR client = cls._get_client(session, region_name=storage_wrapper.bucket.region, endpoint=storage_wrapper.bucket.endpoint) paginator = client.get_paginator('list_objects_v2') operation_parameters = { 'Bucket': storage_wrapper.bucket.path, } prefix = cls.get_prefix(delimiter, storage_wrapper.path) operation_parameters['Prefix'] = prefix page_iterator = paginator.paginate(**operation_parameters) for page in page_iterator: if 'Contents' in page: for file in page['Contents']: name = cls.get_item_name(file['Key'], prefix=prefix) if name.endswith(delimiter): continue yield ('File', file['Key'], cls.get_prefix(delimiter, name), file['Size'], file['LastModified']) @classmethod def path_exists(cls, storage_wrapper, relative_path, session=None): delimiter = S3BucketOperations.S3_PATH_SEPARATOR prefix = cls.get_prefix(delimiter, storage_wrapper.path) if relative_path: skip_separator = prefix.endswith(S3BucketOperations.S3_PATH_SEPARATOR) if skip_separator: prefix = prefix + relative_path else: prefix = prefix + delimiter + relative_path if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'cp') client = cls._get_client(session, region_name=storage_wrapper.bucket.region, endpoint=storage_wrapper.bucket.endpoint) paginator = client.get_paginator('list_objects_v2') operation_parameters = { 'Bucket': storage_wrapper.bucket.path, 'Prefix': prefix, 'Delimiter': delimiter, 'PaginationConfig': { 'MaxItems': 5, 'PageSize': 5 } } page_iterator = paginator.paginate(**operation_parameters) if not prefix.endswith(cls.S3_PATH_SEPARATOR) and not storage_wrapper.path.endswith(cls.S3_PATH_SEPARATOR): prefix = storage_wrapper.path for page in page_iterator: if cls.check_prefix_existence_for_section(prefix, page, 'Contents', 'Key'): return True if cls.check_prefix_existence_for_section(prefix, page, 'CommonPrefixes', 'Prefix'): return True return False @classmethod def check_prefix_existence_for_section(cls, prefix, page, section, key): if section in page: for page_info in page[section]: if key in page_info and page_info[key] == prefix: return True return False @classmethod def get_list_manager(cls, source_wrapper, show_versions=False): session = cls.assumed_session(source_wrapper.bucket.identifier, None, 'ls', versioning=show_versions) return ListingManager(source_wrapper, session, show_versions=show_versions, region_name=source_wrapper.bucket.region, endpoint=source_wrapper.bucket.endpoint) @classmethod def get_delete_manager(cls, source_wrapper, events, versioning=False): session = cls.assumed_session(source_wrapper.bucket.identifier, None, 'mv', versioning=versioning) return DeleteManager(source_wrapper, session, events, source_wrapper.bucket.region, endpoint=source_wrapper.bucket.endpoint) @classmethod def get_restore_manager(cls, source_wrapper, events): session = cls.assumed_session(source_wrapper.bucket.identifier, None, 'mv', versioning=True) return RestoreManager(source_wrapper, session, events, source_wrapper.bucket.region, endpoint=source_wrapper.bucket.endpoint) @classmethod def delete_item(cls, storage_wrapper, relative_path, session=None): if session is None: session = cls.assumed_session(storage_wrapper.bucket.identifier, None, 'mv') client = cls._get_client(session, region_name=storage_wrapper.bucket.region, endpoint=storage_wrapper.bucket.endpoint) delimiter = S3BucketOperations.S3_PATH_SEPARATOR bucket = storage_wrapper.bucket.path if relative_path: prefix = relative_path if prefix.startswith(delimiter): prefix = prefix[1:] else: prefix = delimiter client.delete_objects(Bucket=bucket, Delete=dict(Objects=[dict(Key= prefix)])) @classmethod def normalize_s3_path(cls, destination_wrapper, relative_path): return StorageOperations.normalize_path(destination_wrapper, relative_path) @classmethod def assumed_session(cls, source_bucket_id, destination_bucket_id, command, versioning=False): def refresh(): credentials = DataStorage.get_temporary_credentials(source_bucket_id, destination_bucket_id, command, versioning=versioning) return dict( access_key=credentials.access_key_id, secret_key=credentials.secret_key, token=credentials.session_token, expiry_time=credentials.expiration) fresh_metadata = refresh() if 'token' not in fresh_metadata or not fresh_metadata['token']: session_credentials = Credentials(fresh_metadata['access_key'], fresh_metadata['secret_key']) else: session_credentials = RefreshableCredentials.create_from_metadata( metadata=fresh_metadata, refresh_using=refresh, method='sts-assume-role') s = get_session() s._credentials = session_credentials return Session(botocore_session=s) @classmethod def get_transfer_between_buckets_manager(cls, source_wrapper, destination_wrapper, events, command): source_id = source_wrapper.bucket.identifier destination_id = destination_wrapper.bucket.identifier session = cls.assumed_session(source_id, destination_id, command) # replace session to be able to delete source for move source_wrapper.session = session destination_bucket = destination_wrapper.bucket.path cross_region = destination_wrapper.bucket.region != source_wrapper.bucket.region return TransferBetweenBucketsManager(session, destination_bucket, events, destination_wrapper.bucket.region, cross_region, endpoint=destination_wrapper.bucket.endpoint) @classmethod def get_download_manager(cls, source_wrapper, destination_wrapper, events, command): source_id = source_wrapper.bucket.identifier session = cls.assumed_session(source_id, None, command) # replace session to be able to delete source for move source_wrapper.session = session source_bucket = source_wrapper.bucket.path return DownloadManager(session, source_bucket, events, region_name=source_wrapper.bucket.region, endpoint=source_wrapper.bucket.endpoint) @classmethod def get_download_stream_manager(cls, source_wrapper, destination_wrapper, events, command): source_id = source_wrapper.bucket.identifier session = cls.assumed_session(source_id, None, command) # replace session to be able to delete source for move source_wrapper.session = session source_bucket = source_wrapper.bucket.path return DownloadStreamManager(session, source_bucket, events, region_name=source_wrapper.bucket.region, endpoint=source_wrapper.bucket.endpoint) @classmethod def get_upload_manager(cls, source_wrapper, destination_wrapper, events, command): destination_id = destination_wrapper.bucket.identifier session = cls.assumed_session(None, destination_id, command) destination_bucket = destination_wrapper.bucket.path return UploadManager(session, destination_bucket, events, region_name=destination_wrapper.bucket.region, endpoint=destination_wrapper.bucket.endpoint) @classmethod def get_upload_stream_manager(cls, source_wrapper, destination_wrapper, events, command): destination_id = destination_wrapper.bucket.identifier session = cls.assumed_session(None, destination_id, command) destination_bucket = destination_wrapper.bucket.path return UploadStreamManager(session, destination_bucket, events, region_name=destination_wrapper.bucket.region, endpoint=destination_wrapper.bucket.endpoint) @classmethod def get_full_path(cls, path, param): delimiter = cls.S3_PATH_SEPARATOR return cls.remove_double_slashes(path + delimiter + param) @classmethod def remove_double_slashes(cls, path): return StorageOperations.remove_double_slashes(path) @staticmethod def process_listing(page, name, items, delimiter, exclude, include, prefix, versions=False): found_file = False if name in page: if not versions: single_file_item = S3BucketOperations.get_single_file_item(name, page, prefix) if single_file_item: S3BucketOperations.add_item_to_deletion(single_file_item, prefix, delimiter, include, exclude, versions, items) return True for item in page[name]: if item is None: break if item['Key'] == prefix: found_file = True if S3BucketOperations.expect_to_delete_file(prefix, item): continue S3BucketOperations.add_item_to_deletion(item, prefix, delimiter, include, exclude, versions, items) return found_file @staticmethod def get_single_file_item(name, page, prefix): single_file_item = None for item in page[name]: if item is None: break if not prefix.endswith(S3BucketOperations.S3_PATH_SEPARATOR) and item['Key'] == prefix: single_file_item = item break return single_file_item @staticmethod def expect_to_delete_file(prefix, item): return not prefix.endswith(S3BucketOperations.S3_PATH_SEPARATOR) and not item['Key'] == prefix \ and not item['Key'].startswith(prefix + S3BucketOperations.S3_PATH_SEPARATOR) @staticmethod def add_item_to_deletion(item, prefix, delimiter, include, exclude, versions, items): name = S3BucketOperations.get_item_name(item['Key'], prefix=prefix) name = S3BucketOperations.get_prefix(delimiter, name) if not PatternMatcher.match_any(name, include): return if PatternMatcher.match_any(name, exclude, default=False): return if versions: items.append(dict(Key=item['Key'], VersionId=item['VersionId'])) else: items.append(dict(Key=item['Key'])) @staticmethod def split_by_aws_limit(items, limit=S3_REQUEST_ELEMENTS_LIMIT): if len(items) < limit: return items, [] flushing_items, remaining_items = items[:limit], items[limit:] return remaining_items, flushing_items