def _create_s3_bucket_from_meta()

in syndicate/core/resources/s3_resource.py [0:0]


    def _create_s3_bucket_from_meta(self, name, meta):
        if self.s3_conn.is_bucket_exists(name):
            _LOG.warn('{0} bucket exists.'.format(name))
            return self.describe_bucket(name, meta)

        self.s3_conn.create_bucket(name, location=meta.get('location'))
        _LOG.info('Created S3 bucket {0}.'.format(name))

        public_access_block = meta.get('public_access_block', {})
        if not all([isinstance(param, bool) for param in
                    public_access_block.values()]):
            message = f'Parameters inside public_access_block should have ' \
                      f'bool type'
            _LOG.error(message)
            raise AssertionError(message)
        self.s3_conn.put_public_access_block(name,
                                             **public_access_block)

        acl = meta.get('acl')
        if acl:
            if acl not in S3_BUCKET_ACL_LIST:
                raise AssertionError(
                    f'Invalid value of S3 bucket ACL! Must be one of the '
                    f'{S3_BUCKET_ACL_LIST}')
            self.s3_conn.put_bucket_acl(name, acl)

        policy = meta.get('policy')
        if policy:
            self.s3_conn.add_bucket_policy(name, policy)
            _LOG.debug('Policy on {0} S3 bucket is set up.'.format(name))

        website_hosting = meta['website_hosting'].get('enabled') \
            if meta.get('website_hosting') else None
        if website_hosting:
            index_document = meta['website_hosting'].get('index_document')
            error_document = meta['website_hosting'].get('error_document')
            if not all([isinstance(param, str) for param in (index_document,
                                                             error_document)]):
                raise AssertionError('Parameters \'index_document\' and '
                                     '\'error_document\' must be \'str\' type')
            self.s3_conn.enable_website_hosting(name,
                                                index_document,
                                                error_document)
            _LOG.debug(f'Website hosting configured with parameters: '
                       f'\'index_document\': \'{index_document}\', '
                       f'\'error_document\': \'{error_document}\'')
            website_endpoint = (
                f'http://{name}.s3-website.{self.s3_conn.region}.amazonaws.com'
                f'/{index_document}')
            USER_LOG.info(f'Bucket website endpoint: {website_endpoint}')

        rules = meta.get('LifecycleConfiguration')
        if rules:
            self.s3_conn.add_bucket_rule(name, rules)
            _LOG.debug('Rules on {0} S3 bucket are set up.'.format(name))

        cors_configuration = meta.get('cors')
        if cors_configuration:
            self.s3_conn.put_cors(bucket_name=name, rules=cors_configuration)
        return self.describe_bucket(name, meta)