syndicate/connection/s3_connection.py (390 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import io from json import dumps from boto3 import resource from botocore.client import Config from botocore.exceptions import ClientError from syndicate.commons import deep_get from syndicate.commons.log_helper import get_logger from syndicate.connection.helper import apply_methods_decorator, retry _LOG = get_logger(__name__) @apply_methods_decorator(retry()) class S3Connection(object): """ S3 connection class.""" def __init__(self, region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.region = region self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.aws_session_token = aws_session_token self.resource = resource('s3', self.region, config=Config(signature_version='s3v4'), aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, aws_session_token=self.aws_session_token) self.client = self.resource.meta.client _LOG.debug('Opened new S3 connection.') def load_file_body(self, bucket_name, key): return self.resource.Object(bucket_name, key).get()['Body'].read() def download_file(self, bucket_name, key, file_path): self.resource.Bucket(bucket_name).download_file(key, file_path) def download_to_file(self, bucket_name, key, file): self.resource.Bucket(bucket_name).download_fileobj(key, file) def upload_file(self, storage, file_name, bucket_name, folder=''): """ Upload specific file to s3. :type bucket_name: str :type file_name: str :type storage: str :param storage: path (e.g. '/tmp/') :param folder: str """ self.client.upload_file(storage + file_name, bucket_name, folder + file_name) def upload_single_file(self, path, key, bucket, extra_args=None): """ Uploads file just like method above, but allows to specify object key as argument :param path: path to actual file on os :param key: object key, as it will be uploaded to bucket :param bucket: just bucket name :param extra_args: Extra arguments that may be passed to the client operation """ self.client.upload_file(path, bucket, key, ExtraArgs=extra_args) def put_object(self, file_obj, key, bucket, content_type, content_encoding=None): if content_encoding is None: self.resource.Bucket(bucket).put_object(Body=file_obj, ContentType=content_type, Key=key) else: self.resource.Bucket(bucket).put_object( Body=file_obj, ContentType=content_type, Key=key, ContentEncoding=content_encoding) def is_bucket_exists(self, name): """ Check if bucket exists by name. :type name: str """ res = self.get_list_buckets() existing_buckets = [each['Name'] for each in res] return name in existing_buckets def get_bucket_acl(self, bucket_name): try: return self.client.get_bucket_acl(Bucket=bucket_name) except ClientError as e: if 'NoSuchBucket' in str(e): pass # valid exception else: raise e def get_bucket_location(self, bucket_name): try: return self.client.get_bucket_location(Bucket=bucket_name) except ClientError as e: if 'NoSuchBucket' in str(e): pass # valid exception else: raise e def get_bucket_policy(self, bucket_name): try: return self.client.get_bucket_policy(Bucket=bucket_name) except ClientError as e: if 'NoSuchBucketPolicy' or 'NoSuchBucket' in str(e): pass # valid exception else: raise e def get_bucket_website(self, bucket_name): try: return self.client.get_bucket_website(Bucket=bucket_name) except ClientError as e: if 'NoSuchWebsiteConfiguration' or 'NoSuchBucket' in str(e): pass # valid exception else: raise e def is_file_exists(self, bucket_name, key): """ Check if file exists. :type bucket_name: str :type key: str """ return True if self.retrieve_object_metadata(bucket_name, key) else \ False def create_bucket(self, bucket_name, acl=None, location=None): """ :type bucket_name: str :param acl: private|public-read|public-read-write|authenticated-read :param location: region """ param = dict(Bucket=bucket_name) param['ObjectOwnership'] = 'ObjectWriter' if acl: param['ACL'] = acl if not location: location = self.region valid_location = ['us-west-1', 'us-west-2', 'ca-central-1', 'eu-west-1', 'eu-west-2', 'eu-west-3', 'eu-central-1', 'ap-south-1', 'ap-southeast-1', 'ap-southeast-2', 'ap-northeast-1', 'ap-northeast-2', 'sa-east-1', 'us-east-2', 'eu-central-1', 'us-east-1', 'eu-north-1'] if location not in valid_location: raise AssertionError('Param "location" has invalid value.' 'Valid locations: {0}'.format(valid_location)) if location != 'us-east-1': # this is default location param['CreateBucketConfiguration'] = { 'LocationConstraint': location } self.client.create_bucket(**param) def remove_bucket(self, bucket_name, log_not_found_error=True): """ Remove bucket by name. To remove bucket it must be empty. log_not_found_error parameter is needed for proper log handling in the retry decorator """ bucket = self.resource.Bucket(bucket_name) bucket_versioning = self.resource.BucketVersioning(bucket_name) if bucket_versioning.status == 'Enabled': bucket.object_versions.delete() else: bucket.objects.all().delete() bucket.delete() def delete_bucket(self, bucket_name): self.client.delete_bucket(Bucket=bucket_name) def add_lambda_event_source(self, bucket: str, lambda_arn: str, event_source: dict): """ Create event notification in the bucket that triggers the lambda Note: two identical events can't be configured for two separate lambdas in one bucket :param bucket: :param lambda_arn: :param event_source: - s3_events: list[str] - list of S3 event types: 's3:ReducedRedundancyLostObject' 's3:ObjectCreated:*' 's3:ObjectCreated:Put' 's3:ObjectCreated:Post' 's3:ObjectCreated:Copy' 's3:ObjectCreated:CompleteMultipartUpload' 's3:ObjectRemoved:*' 's3:ObjectRemoved:Delete' 's3:ObjectRemoved:DeleteMarkerCreated' - filter_rules (optional): list[dict] - list of S3 event filters: {'Name': 'prefix'|'suffix', 'Value': 'string'} """ config = self.get_bucket_notification(bucket_name=bucket) config.pop('ResponseMetadata') if 'LambdaFunctionConfigurations' not in config: config['LambdaFunctionConfigurations'] = [] # for some reason filter rule's name value is in uppercase when # should be in lower according to the documentation for lambda_config in config['LambdaFunctionConfigurations']: filter_rules = deep_get( lambda_config, ['Filter', 'Key', 'FilterRules'], []) for filter_rule in filter_rules: filter_rule['Name'] = filter_rule['Name'].lower() params = { 'LambdaFunctionArn': lambda_arn, 'Events': event_source['s3_events'] } if event_source.get('filter_rules'): params.update({ 'Filter': { 'Key': { 'FilterRules': event_source['filter_rules'] } } }) # add event notification to remote if it is not already present for remote_event_source in config['LambdaFunctionConfigurations']: remote_event_source_copy = remote_event_source.copy() remote_event_source_copy.pop('Id') if remote_event_source_copy == params: break else: config['LambdaFunctionConfigurations'].append(params) self.put_bucket_notification( bucket_name=bucket, notification_configuration=config) def remove_lambda_event_source(self, bucket: str, lambda_arn: str, event_source: dict): """ Remove event notification in the bucket that triggers the lambda Note: two identical events can't be configured for two separate lambdas in one bucket :param bucket: :param lambda_arn: :param event_source: - s3_events: list[str] - list of S3 event types: 's3:ReducedRedundancyLostObject' 's3:ObjectCreated:*' 's3:ObjectCreated:Put' 's3:ObjectCreated:Post' 's3:ObjectCreated:Copy' 's3:ObjectCreated:CompleteMultipartUpload' 's3:ObjectRemoved:*' 's3:ObjectRemoved:Delete' 's3:ObjectRemoved:DeleteMarkerCreated' - filter_rules (optional): list[dict] - list of S3 event filters: {'Name': 'prefix'|'suffix', 'Value': 'string'} """ config = self.get_bucket_notification(bucket_name=bucket) config.pop('ResponseMetadata') if 'LambdaFunctionConfigurations' not in config: _LOG.info('No lambda event source to remove') return saved_lambda_configs = [] for lambda_config in config['LambdaFunctionConfigurations']: # for some reason filter rule's name value is in uppercase when # should be in lower according to the documentation filter_rules = deep_get( lambda_config, ['Filter', 'Key', 'FilterRules'], []) for filter_rule in filter_rules: filter_rule['Name'] = filter_rule['Name'].lower() current_lambda = (lambda_config['LambdaFunctionArn'] == lambda_arn) same_filters = filter_rules == event_source.get('filter_rules', []) same_events = lambda_config['Events'] == event_source['s3_events'] # save config if something is different from current meta if not current_lambda or not same_filters or not same_events: saved_lambda_configs.append(lambda_config) config['LambdaFunctionConfigurations'] = saved_lambda_configs self.put_bucket_notification( bucket_name=bucket, notification_configuration=config) def get_list_buckets(self): response = self.client.list_buckets() return response.get('Buckets') def add_bucket_policy(self, bucket_name, policy_document): """ Attach inline policy to existing bucket. :type bucket_name: str :type policy_document: str (json) """ if isinstance(policy_document, dict): policy_document = dumps(policy_document) self.client.put_bucket_policy( Bucket=bucket_name, Policy=policy_document ) def add_bucket_rule(self, bucket_name, rule_document): """ Creates rule to existing bucket. :param bucket_name: :param rule_document: :return: """ self.client.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=rule_document ) def enable_website_hosting(self, bucket, index_doc, error_doc): """ :type bucket: str :type index_doc: str :type error_doc: str """ self.client.put_bucket_website( Bucket=bucket, WebsiteConfiguration={ 'ErrorDocument': { 'Key': error_doc }, 'IndexDocument': { 'Suffix': index_doc } } ) def remove_object(self, bucket_name, file_name, mfa=None, version_id=None, request_payer=None): """ :type bucket_name: str :type file_name: str :type mfa: str :type version_id: str :type request_payer: str :return: response (dict) """ params = dict(Bucket=bucket_name, Key=file_name) if mfa: params['MFA'] = mfa if version_id: params['VersionId'] = version_id if request_payer: params['RequestPayer'] = request_payer return self.client.delete_object(**params) def list_objects(self, bucket_name, delimiter=None, encoding_type=None, prefix=None, request_payer=None): params = dict(Bucket=bucket_name) if delimiter: params['Delimiter'] = delimiter if encoding_type: params['EncodingType'] = encoding_type if prefix: params['Prefix'] = prefix if request_payer: params['RequestPayer'] = request_payer bucket_objects = [] response = self.client.list_objects(**params) marker = response.get('Marker') if response.get('Contents'): bucket_objects.extend(response.get('Contents')) while marker: params['Marker'] = marker response = self.client.list_objects(**params) marker = response.get('Marker') if response.get('Contents'): bucket_objects.extend(response.get('Contents')) return bucket_objects def get_bucket_notification(self, bucket_name): try: return self.client.get_bucket_notification_configuration( Bucket=bucket_name ) except ClientError as e: if 'AccessDenied' in str(e): _LOG.warning(f'{e}. Bucket name - \'{bucket_name}\'.') else: raise e def put_bucket_notification(self, bucket_name, notification_configuration): self.client.put_bucket_notification_configuration( Bucket=bucket_name, NotificationConfiguration=notification_configuration ) def remove_bucket_notification(self, bucket_name): self.client.put_bucket_notification_configuration( Bucket=bucket_name, NotificationConfiguration={} ) def list_keys(self, bucket_name): bucket = self.resource.Bucket(bucket_name) result = [obj.key for obj in bucket.objects.all()] return result def get_keys_by_prefix(self, bucket_name, prefix): bucket = self.resource.Bucket(bucket_name) return [obj.key for obj in bucket.objects.filter(Prefix=prefix)] def list_object_versions(self, bucket_name, delimeter=None, encoding_type=None, prefix=None): params = dict(Bucket=bucket_name) if delimeter: params['Delimiter'] = delimeter if encoding_type: params['EncodingType'] = encoding_type if prefix: params['Prefix'] = prefix bucket_objects = [] response = self.client.list_object_versions(**params) versions = response.get('Versions', []) bucket_objects.extend( [{ 'Key': i['Key'], 'VersionId': i['VersionId'] } for i in versions]) key_marker = response.get('NextKeyMarker') version_marker = response.get('NextVersionIdMarker') while key_marker or version_marker: if key_marker: params['KeyMarker'] = key_marker if version_marker: params['VersionIdMarker'] = version_marker response = self.client.list_object_versions(**params) versions = response.get('Versions', []) bucket_objects.extend( [{ 'Key': i['Key'], 'VersionId': i['VersionId'] } for i in versions]) return bucket_objects def list_object_markers(self, bucket_name, delimeter=None, encoding_type=None, prefix=None): params = dict(Bucket=bucket_name) if delimeter: params['Delimiter'] = delimeter if encoding_type: params['EncodingType'] = encoding_type if prefix: params['Prefix'] = prefix bucket_objects = [] response = self.client.list_object_versions(**params) delete_markers = response.get('DeleteMarkers', []) bucket_objects.extend( [{ 'Key': i['Key'], 'VersionId': i['VersionId'] } for i in delete_markers]) key_marker = response.get('NextKeyMarker') version_marker = response.get('NextVersionIdMarker') while key_marker or version_marker: if key_marker: params['KeyMarker'] = key_marker if version_marker: params['VersionIdMarker'] = version_marker response = self.client.list_object_versions(**params) delete_markers = response.get('DeleteMarkers', []) bucket_objects.extend( [{ 'Key': i['Key'], 'VersionId': i['VersionId'] } for i in delete_markers]) return bucket_objects def retrieve_object_metadata(self, bucket_name, key): try: return self.client.head_object(Bucket=bucket_name, Key=key) except ClientError as e: if 'HeadObject' in str(e): pass # valid exception else: raise e return def delete_objects(self, bucket_name, objects, mfa=None, request_payer=None): params = dict(Bucket=bucket_name, Delete={'Objects': objects}) if mfa: params['MFA'] = mfa if request_payer: params['RequestPayer'] = request_payer return self.client.delete_objects(**params) def put_cors(self, bucket_name, rules): """ Puts buckets configuration for existing bucket. :param bucket_name: name of the bucket. :param rules: list of rules. Each rule may have the following attributes: AllowedHeaders, AllowedMethods, AllowedOrigins, ExposeHeaders, MaxAgeSeconds; :return: None as boto3 does. """ boto_rules = [] for rule in rules: # converting rule to boto format for key in rule.keys(): if isinstance(rule[key], list) \ or isinstance(rule[key], int): pass # expected elif isinstance(rule[key], str): rule[key] = [rule[key]] else: raise AssertionError( 'Value of CORS rule attribute {0} has invalid ' 'value: {1}. Should be str, int or list'.format(key, rule[ key])) boto_rules.append(rule) # boto3 returns None here self.client.put_bucket_cors( Bucket=bucket_name, CORSConfiguration={ "CORSRules": boto_rules } ) _LOG.info('CORS configuration has been set to bucket {0}'.format( bucket_name)) def is_versioning_enabled(self, bucket_name): return self.client.get_bucket_versioning( Bucket=bucket_name) == 'Enabled' def put_public_access_block(self, bucket_name: str, block_public_acls: bool = True, ignore_public_acls: bool = True, block_public_policy: bool = True, restrict_public_buckets: bool = True): public_access_block_configuration = { 'BlockPublicAcls': block_public_acls, 'IgnorePublicAcls': ignore_public_acls, 'BlockPublicPolicy': block_public_policy, 'RestrictPublicBuckets': restrict_public_buckets } _LOG.info(f'Setting public access block for bucket: {bucket_name} ' f'with params: {public_access_block_configuration}') self.client.put_public_access_block( Bucket=bucket_name, PublicAccessBlockConfiguration=public_access_block_configuration ) _LOG.info(f'Public access block was set') def put_bucket_acl(self, bucket_name: str, acl: str): self.client.put_bucket_acl( Bucket=bucket_name, ACL=acl ) _LOG.info(f'ACL was set as \'{acl}\'')