docker/services/clients/s3.py (185 lines of code) (raw):

import json import os import boto3 from botocore.config import Config from commons.constants import ENV_SERVICE_MODE, DOCKER_SERVICE_MODE, \ ENV_MINIO_HOST, ENV_MINIO_PORT, ENV_MINIO_ACCESS_KEY, \ ENV_MINIO_SECRET_ACCESS_KEY from commons.log_helper import get_logger UTF_8_ENCODING = 'utf-8' _LOG = get_logger('s3client') class S3Client: IS_DOCKER = os.getenv(ENV_SERVICE_MODE) == DOCKER_SERVICE_MODE def __init__(self, region): self.region = region self._client = None self._resource = None def build_config(self) -> Config: config = Config(retries={ 'max_attempts': 10, 'mode': 'standard' }) if self.IS_DOCKER: config = config.merge(Config(s3={ 'signature_version': 's3v4', 'addressing_style': 'path' })) return config def _init_clients(self): config = self.build_config() if self.IS_DOCKER: host, port = os.getenv(ENV_MINIO_HOST), os.getenv(ENV_MINIO_PORT) access_key = os.getenv(ENV_MINIO_ACCESS_KEY) secret_access_key = os.getenv(ENV_MINIO_SECRET_ACCESS_KEY) assert (host and port and access_key and secret_access_key), \ f"\'{ENV_MINIO_HOST}\', \'{ENV_MINIO_PORT}\', " \ f"\'{ENV_MINIO_ACCESS_KEY}\', " \ f"\'{ENV_MINIO_SECRET_ACCESS_KEY}\' envs must be specified " \ f"for on-prem" url = f'http://{host}:{port}' session = boto3.Session( aws_access_key_id=access_key, aws_secret_access_key=secret_access_key ) self._client = session.client('s3', endpoint_url=url, config=config) self._resource = session.resource('s3', endpoint_url=url, config=config) _LOG.info('Minio connection was successfully initialized') else: # saas self._client = boto3.client('s3', self.region, config=config) self._resource = boto3.resource('s3', self.region, config=config) _LOG.info('S3 connection was successfully initialized') @property def client(self): if not self._client: self._init_clients() return self._client @property def resource(self): if not self._resource: self._init_clients() return self._resource def file_exists(self, bucket_name, key): """ Checks if object with the given key exists in bucket. :return: True if exists, else False """ existing_objects = self.list_objects(bucket_name=bucket_name) existing_keys = [obj['Key'] for obj in existing_objects] _LOG.debug('Available s3 keys: {0}'.format(existing_keys)) return key in existing_keys def put_object(self, bucket_name, object_name, body): s3_object = self.resource.Object(bucket_name, object_name) return s3_object.put(Body=body, ContentEncoding='utf-8') def is_bucket_exists(self, bucket_name): """ Check if specified bucket exists. :param bucket_name: name of the bucket to check; :return: True is exists, otherwise - False """ existing_buckets = self._list_buckets() return bucket_name in existing_buckets def _list_buckets(self): response = self.client.list_buckets() return [bucket['Name'] for bucket in response.get("Buckets")] def get_json_file_content(self, bucket_name, full_file_name): """ Returns content of the object. :param bucket_name: name of the bucket. :param full_file_name: name of the file including its folders. Example: /folder1/folder2/file_name.json :return: content of the file """ response = self.client.get_object( Bucket=bucket_name, Key=full_file_name ) streaming_body = response.get('Body') if streaming_body: return json.loads(streaming_body.read()) def get_json_lines_file_content(self, bucket_name, full_file_name): response = self.client.get_object( Bucket=bucket_name, Key=full_file_name ) streaming_body = response.get('Body') body = streaming_body.read() lines = body.decode().split('\n') return [json.loads(line) for line in lines if line.strip()] def get_file_content(self, bucket_name, full_file_name, decode=False): """ Returns content of the object. :param bucket_name: name of the bucket. :param full_file_name: name of the file including its folders. Example: /folder1/folder2/file_name.json :param decode: flag :return: content of the file """ response = self.client.get_object( Bucket=bucket_name, Key=full_file_name ) streaming_body = response.get('Body') if decode: return streaming_body.read().decode(UTF_8_ENCODING) return streaming_body.read() def put_object_encrypted(self, bucket_name, object_name, body): return self.client.put_object( Body=body, Bucket=bucket_name, Key=object_name, ServerSideEncryption='AES256') def list_objects(self, bucket_name, prefix=None): result_keys = [] params = dict(Bucket=bucket_name) if prefix: params['Prefix'] = prefix response = self.client.list_objects_v2(**params) if not response.get('Contents'): return None result_keys.extend(item for item in response['Contents']) while response['IsTruncated'] is True: token = response['NextContinuationToken'] params['ContinuationToken'] = token response = self.client.list_objects_v2(**params) result_keys.extend(item for item in response['Contents']) return result_keys def list_objects_gen(self, bucket_name, prefix=None, only_keys=False): params = dict(Bucket=bucket_name) if prefix: params['Prefix'] = prefix response = self.client.list_objects_v2(**params) if not response.get('Contents'): return for item in response['Contents']: yield item['Key'] if only_keys else item while response['IsTruncated'] is True: token = response['NextContinuationToken'] params['ContinuationToken'] = token response = self.client.list_objects_v2(**params) for item in response['Contents']: yield item['Key'] if only_keys else item def delete_file(self, bucket_name, file_key): self.resource.Object(bucket_name, file_key).delete() def generate_presigned_url(self, bucket_name, full_file_name, client_method='get_object', http_method='GET', expires_in_sec=300): return self.client.generate_presigned_url( ClientMethod=client_method, Params={ 'Bucket': bucket_name, 'Key': full_file_name, }, ExpiresIn=expires_in_sec, HttpMethod=http_method ) def generate_presigned_post_url(self, bucket_name, object_name, fields=None, conditions=None, expiration=300): response = self.client.generate_presigned_post(bucket_name, object_name, Fields=fields, Conditions=conditions, ExpiresIn=expiration) return response def list_dir(self, bucket_name, key): objects = self.list_objects(bucket_name, key) if objects: return [obj['Key'] for obj in objects] return [] def download_file(self, bucket_name, full_file_name, output_folder_path): file_name = os.path.split(full_file_name)[-1] output_file_path = os.path.join(output_folder_path, file_name) with open(output_file_path, 'wb') as f: content = self.get_file_content( bucket_name=bucket_name, full_file_name=full_file_name ) f.write(content) return output_file_path def create_bucket(self, bucket_name, region=None): region = region or self.region self.client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': region } )