dusty/tools/depots/minio/depot.py (106 lines of code) (raw):

#!/usr/bin/python3 # coding=utf-8 # pylint: disable=I0011 # Copyright 2019 getcarrier.io # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ MinIO depot """ import io import minio # pylint: disable=E0401 import urllib3 # pylint: disable=E0401 from dusty.models.depot import ObjectDepotModel from dusty.tools import log class Depot(ObjectDepotModel): """ MinIO depot class """ def __init__(self, context, config): """ Initialize depot instance """ super().__init__() self.context = context self.validate_config(config) self.config = config self.client = self._create_minio_client() def _create_minio_client(self): http_client = None if not self.config.get("ssl_verify", False): http_client = urllib3.PoolManager( timeout=urllib3.Timeout.DEFAULT_TIMEOUT, cert_reqs="CERT_NONE", maxsize=10, retries=urllib3.Retry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] ) ) if isinstance(self.config.get("ssl_verify", False), str): http_client = urllib3.PoolManager( timeout=urllib3.Timeout.DEFAULT_TIMEOUT, cert_reqs="CERT_REQUIRED", ca_certs=self.config.get("ssl_verify"), maxsize=10, retries=urllib3.Retry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] ) ) client = minio.Minio( endpoint=self.config["endpoint"], access_key=self.config.get("access_key", None), secret_key=self.config.get("secret_key", None), secure=self.config.get("secure", True), region=self.config.get("region", None), http_client=http_client ) # Test client auth client.bucket_exists(self.config.get("bucket", "carrier")) return client def get_object(self, key): """ Get object by key """ try: return self.client.get_object(self.config.get("bucket", "carrier"), key).read() except: # pylint: disable=W0702 return None def put_object(self, key, data): """ Put object by key """ try: if isinstance(data, str): data = data.encode("utf-8") data_obj = io.BytesIO(data) self.client.put_object(self.config.get("bucket", "carrier"), key, data_obj, len(data)) return True except: # pylint: disable=W0702 log.exception("Failed to put object") return False @staticmethod def fill_config(data_obj): """ Make sample config """ data_obj.insert( len(data_obj), "endpoint", "minio.example.com:9000", comment="S3 object storage endpoint" ) data_obj.insert( len(data_obj), "bucket", "carrier", comment="Carrier bucket name" ) data_obj.insert( len(data_obj), "access_key", "ACCESSKEYVALUE", comment="(optional) Access key for the object storage endpoint" ) data_obj.insert( len(data_obj), "secret_key", "SECRETACCESSKEYVALUE", comment="(optional) Secret key for the object storage endpoint." ) data_obj.insert( len(data_obj), "secure", True, comment="(optional) Set this value to True to enable secure (HTTPS) access" ) data_obj.insert( len(data_obj), "ssl_verify", True, comment="(optional) Verify SSL certificate: True, False or path to CA bundle" ) data_obj.insert( len(data_obj), "region", "us-east-1", comment="(optional) Set this value to override automatic bucket location discovery" ) @staticmethod def validate_config(config): """ Validate config """ required = ["endpoint"] not_set = [item for item in required if item not in config] if not_set: error = f"Required configuration options not set: {', '.join(not_set)}" log.error(error) raise ValueError(error) @staticmethod def get_name(): """ Module name """ return "minio" @staticmethod def get_description(): """ Module description or help message """ return "MinIO depot"