syndicate/core/resources/s3_resource.py (163 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import ipaddress
import re
import string
from typing import Optional
from syndicate.commons.log_helper import get_logger, get_user_logger
from syndicate.core import ClientError
from syndicate.core.constants import S3_BUCKET_ACL_LIST
from syndicate.core.helper import unpack_kwargs
from syndicate.core.resources.base_resource import BaseResource
from syndicate.core.resources.helper import build_description_obj, chunks
_LOG = get_logger(__name__)
USER_LOG = get_user_logger()
def validate_bucket_name(bucket_name: str):
"""Checks whether the given bucket name is valid.
If the given name isn't valid, ValueError with an appropriate message
is raised.
:type bucket_name: str
:param bucket_name: the name to check
"""
bucket_name = bucket_name.strip()
_LOG.info(f"Starting validating bucket name '{bucket_name}'")
error = None
raw_bucket_name, _ = (bucket_name.split('/', 1)
if bucket_name and '/' in bucket_name
else (bucket_name, None))
if not raw_bucket_name or not 3 <= len(raw_bucket_name) <= 63:
error = 'Bucket name must be between 3 and 63 characters long'
else:
invalid_characters = re.search('[^a-z0-9.-]', bucket_name)
if invalid_characters:
character = invalid_characters.group()
if character in string.ascii_uppercase:
error = 'Bucket name must not contain uppercase characters'
else:
error = f'Bucket name contains invalid characters: {character}'
elif any(bucket_name.startswith(symbol) for symbol in '.-'):
error = 'Bucket name must start with a lowercase letter or number'
elif any(bucket_name.endswith(symbol) for symbol in '.-'):
error = 'Bucket name must not end with dash or period'
elif '..' in bucket_name:
error = 'Bucket name must not contain two adjacent periods'
elif '.-' in bucket_name or '-.' in bucket_name:
error = 'Bucket name must not contain dash next to period'
else:
try:
ipaddress.ip_address(bucket_name)
error = 'Bucket name must not resemble an IP address'
except ValueError:
pass
if error:
_LOG.warning(error)
raise ValueError(error)
_LOG.info(f"Finished validating bucket name '{bucket_name}'")
class S3Resource(BaseResource):
def __init__(self, s3_conn, account_id) -> None:
self.s3_conn = s3_conn
self.account_id = account_id
def create_s3_bucket(self, args):
return self.create_pool(self._create_s3_bucket_from_meta, args)
def describe_bucket(self, name, meta):
arn = self.get_bucket_arn(name)
acl_response = self.s3_conn.get_bucket_acl(name)
location_response = self.s3_conn.get_bucket_location(name)
bucket_policy = self.s3_conn.get_bucket_policy(name)
if not location_response:
return {}
response = {
'bucket_acl': acl_response,
'location': location_response,
}
if bucket_policy:
response['policy'] = bucket_policy
return {
arn: build_description_obj(response, name, meta)
}
@staticmethod
def get_bucket_arn(name):
return 'arn:aws:s3:::{0}'.format(name)
@unpack_kwargs
def _create_s3_bucket_from_meta(self, name, meta):
if self.s3_conn.is_bucket_exists(name):
_LOG.warn('{0} bucket exists.'.format(name))
return self.describe_bucket(name, meta)
self.s3_conn.create_bucket(name, location=meta.get('location'))
_LOG.info('Created S3 bucket {0}.'.format(name))
public_access_block = meta.get('public_access_block', {})
if not all([isinstance(param, bool) for param in
public_access_block.values()]):
message = f'Parameters inside public_access_block should have ' \
f'bool type'
_LOG.error(message)
raise AssertionError(message)
self.s3_conn.put_public_access_block(name,
**public_access_block)
acl = meta.get('acl')
if acl:
if acl not in S3_BUCKET_ACL_LIST:
raise AssertionError(
f'Invalid value of S3 bucket ACL! Must be one of the '
f'{S3_BUCKET_ACL_LIST}')
self.s3_conn.put_bucket_acl(name, acl)
policy = meta.get('policy')
if policy:
self.s3_conn.add_bucket_policy(name, policy)
_LOG.debug('Policy on {0} S3 bucket is set up.'.format(name))
website_hosting = meta['website_hosting'].get('enabled') \
if meta.get('website_hosting') else None
if website_hosting:
index_document = meta['website_hosting'].get('index_document')
error_document = meta['website_hosting'].get('error_document')
if not all([isinstance(param, str) for param in (index_document,
error_document)]):
raise AssertionError('Parameters \'index_document\' and '
'\'error_document\' must be \'str\' type')
self.s3_conn.enable_website_hosting(name,
index_document,
error_document)
_LOG.debug(f'Website hosting configured with parameters: '
f'\'index_document\': \'{index_document}\', '
f'\'error_document\': \'{error_document}\'')
website_endpoint = (
f'http://{name}.s3-website.{self.s3_conn.region}.amazonaws.com'
f'/{index_document}')
USER_LOG.info(f'Bucket website endpoint: {website_endpoint}')
rules = meta.get('LifecycleConfiguration')
if rules:
self.s3_conn.add_bucket_rule(name, rules)
_LOG.debug('Rules on {0} S3 bucket are set up.'.format(name))
cors_configuration = meta.get('cors')
if cors_configuration:
self.s3_conn.put_cors(bucket_name=name, rules=cors_configuration)
return self.describe_bucket(name, meta)
def _delete_objects(self, bucket_name, keys):
response = self.s3_conn.delete_objects(bucket_name, keys)
errors = response.get('Errors')
if errors:
error_keys = [{
'Key': i['Key'],
'VersionId': i['VersionId']
} for i in errors]
return error_keys
else:
return []
def remove_buckets(self, args):
return self.create_pool(self._remove_bucket, args)
@unpack_kwargs
def _remove_bucket(self, arn, config):
bucket_name = config['resource_name']
try:
self.s3_conn.remove_bucket(bucket_name=bucket_name,
log_not_found_error=False)
_LOG.info('S3 bucket {0} was removed.'.format(bucket_name))
return {arn: config}
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
_LOG.warn('S3 bucket {0} is not found'.format(bucket_name))
return {arn: config}
else:
raise e
def build_bucket_arn(self, maybe_arn: str) -> Optional[str]:
if not isinstance(maybe_arn, str):
return
if self.is_bucket_arn(maybe_arn):
return maybe_arn
return f'arn:aws:s3:::{maybe_arn}'
@staticmethod
def is_bucket_arn(maybe_arn: str) -> bool:
arn_regex = r'^arn:aws:s3:::[a-z0-9.-]{3,63}(?:/.*)?$'
return bool(re.match(arn_regex, maybe_arn))