docker/services/storage_service.py (231 lines of code) (raw):
import os
from datetime import datetime, timedelta, date
from glob import glob
import concurrent
from bson import ObjectId
from bson.errors import InvalidId
from mongoengine.errors import DoesNotExist, ValidationError
from commons.constants import SERVICE_ATTR, \
JOB_STEP_DOWNLOAD_METRICS, CSV_EXTENSION, META_FILE_NAME, ALL
from commons.exception import ExecutorException
from commons.log_helper import get_logger
from commons.profiler import profiler
from models.storage import Storage, StorageServiceEnum, S3Storage
from services.clients.s3 import S3Client
_LOG = get_logger('r8s-storage-service')
DATE_FORMAT = '%Y-%m-%d'
class StorageService:
def __init__(self, s3_client: S3Client):
self.s3_client = s3_client
self.storage_service_class_mapping = {
StorageServiceEnum.S3_BUCKET: S3Storage
}
@staticmethod
def list():
return list(Storage.objects.all())
def get(self, identifier: str):
_LOG.debug(f'Describing storage by identifier: \'{identifier}\'')
try:
_LOG.debug(f'Trying to convert to bson id')
ObjectId(identifier)
_LOG.debug(f'Describing storage by id')
return self.get_by_id(object_id=identifier)
except InvalidId:
_LOG.debug(f'Describing storage by name')
return self.get_by_name(name=identifier)
@staticmethod
def get_by_id(object_id):
try:
return Storage.objects.get(id=object_id)
except (DoesNotExist, ValidationError):
return None
@staticmethod
def get_by_name(name: str):
try:
return Storage.objects.get(name=name)
except (DoesNotExist, ValidationError):
return None
def create(self, storage_data: dict):
storage_service = storage_data.get(SERVICE_ATTR)
storage_class = self.storage_service_class_mapping.get(storage_service)
storage = storage_class(**storage_data)
return storage
@staticmethod
def save(storage: Storage):
storage.save()
@staticmethod
def delete(storage: Storage):
storage.delete()
@profiler(execution_step=f's3_download_tenant_metrics')
def download_metrics(self, data_source: Storage, output_path: str,
scan_customer, scan_clouds, scan_tenants,
scan_from_date, scan_to_date, max_days):
type_downloader_mapping = {
S3Storage: self._download_metrics_s3
}
downloader = type_downloader_mapping.get(data_source.__class__)
if not downloader:
raise ExecutorException(
step_name=JOB_STEP_DOWNLOAD_METRICS,
reason=f'No downloader available for storage class '
f'\'{data_source.__class__}\''
)
return downloader(data_source, output_path, scan_customer,
scan_clouds, scan_tenants, scan_from_date,
scan_to_date, max_days)
def _download_metrics_s3(self, data_source: S3Storage, output_path,
scan_customer, scan_clouds, scan_tenants,
scan_from_date=None, scan_to_date=None,
max_days=None):
access = data_source.access
prefix = access.prefix
bucket_name = access.bucket_name
paths = self._build_s3_paths(prefix=prefix,
scan_customer=scan_customer,
scan_clouds=scan_clouds,
scan_tenants=scan_tenants)
_LOG.debug(f'Listing objects in bucket \'{bucket_name}\'. '
f'from paths: \'{paths}\'')
objects = []
if paths:
for path in paths:
files = self.s3_client.list_objects(bucket_name=bucket_name,
prefix=path)
if files:
objects.extend(files)
else:
files = self.s3_client.list_objects(bucket_name=bucket_name)
objects.extend(files)
objects = [obj for obj in objects if
obj.get('Key').endswith(CSV_EXTENSION)
or obj.get('Key').endswith(f'/{META_FILE_NAME}')]
if not scan_from_date and max_days:
_LOG.debug(f'Start stan date is not specified. Going to use '
f'limitation from algorithm of {max_days} days')
scan_start_dt = datetime.utcnow() - timedelta(days=max_days)
scan_from_date = scan_start_dt.strftime(DATE_FORMAT)
filter_only_dates = self.get_scan_dates_list(
scan_from_date=scan_from_date,
scan_to_date=scan_to_date
)
if filter_only_dates:
objects = [obj for obj in objects if obj['Key'].split('/')[-2]
in filter_only_dates]
_LOG.debug(f'{len(objects)} metric/meta files found, downloading')
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for file in objects:
path = file.get('Key').split('/')
if len(path) > 0 and path[0] == prefix:
path = path[1:]
path = '/'.join(path[:-1])
output_folder_path = '/'.join((output_path, path))
os.makedirs(output_folder_path, exist_ok=True)
futures.append(executor.submit(
self.s3_client.download_file,
bucket_name=bucket_name,
full_file_name=file.get('Key'),
output_folder_path=output_folder_path
))
@profiler(execution_step=f's3_upload_job_results')
def upload_job_results(self, job_id, storage: Storage,
results_folder_path, tenant=None):
type_uploader_mapping = {
S3Storage: self._upload_job_results_s3
}
downloader = type_uploader_mapping.get(storage.__class__)
if not downloader:
raise ExecutorException(
step_name=JOB_STEP_DOWNLOAD_METRICS,
reason=f'No downloader available for storage class '
f'\'{storage.__class__}\''
)
return downloader(job_id, storage, results_folder_path, tenant)
def _upload_job_results_s3(self, job_id, storage, results_folder_path,
tenant=None):
access = storage.access
prefix = access.prefix
bucket_name = access.bucket_name
if prefix:
s3_folder_path = os.path.join(prefix, job_id)
else:
s3_folder_path = job_id
_LOG.debug(f'Listing objects in bucket \'{bucket_name}\'. '
f'Prefix: \'{prefix}\'')
files = [y for x in os.walk(results_folder_path)
for y in glob(os.path.join(x[0], '*.jsonl'))]
if tenant:
files = [file for file in files if file.split('/')[-2] == tenant]
for file in files:
file_key = file.replace(results_folder_path, '').strip('/')
s3_file_key = os.path.join(s3_folder_path, file_key)
with open(file, 'r') as f:
body = f.read()
self.s3_client.put_object(
bucket_name=bucket_name,
object_name=s3_file_key,
body=body
)
@staticmethod
def _build_s3_paths(prefix, scan_customer, scan_clouds, scan_tenants):
path_lst = []
paths = []
if prefix:
path_lst.append(prefix)
if scan_customer:
path_lst.append(scan_customer)
for scan_cloud in scan_clouds:
if scan_tenants and ALL not in scan_tenants:
for tenant in scan_tenants:
tenant_path = path_lst.copy()
tenant_path.append(scan_cloud)
tenant_path.append(tenant)
paths.append('/'.join(tenant_path))
else:
cloud_path = path_lst.copy()
cloud_path.append(scan_cloud)
paths.append('/'.join(cloud_path))
if not paths and path_lst:
paths.append('/'.join(path_lst))
return paths
def get_scan_dates_list(self, scan_from_date: str = None,
scan_to_date: str = None):
if not scan_from_date:
_LOG.warning(f'No start date provided.')
return
try:
start_dt = datetime.strptime(scan_from_date, DATE_FORMAT)
except ValueError:
_LOG.warning(f'Invalid scan start date: {scan_from_date} '
f'must have %Y_%m_%d pattern.')
return
try:
end_dt = datetime.strptime(scan_to_date, DATE_FORMAT)
except (ValueError, TypeError):
_LOG.warning(f'Invalid/Empty scan stop date: {scan_from_date} '
f'must have {DATE_FORMAT} pattern.')
end_d = date.today() + timedelta(days=1)
end_dt = datetime.combine(end_d, datetime.min.time())
_LOG.debug(f'Default stop date will be used: {end_dt.isoformat()}')
dt_list = self.__date_range(start_date=start_dt, end_date=end_dt)
return [dt.strftime(DATE_FORMAT) for dt in dt_list]
@staticmethod
def __date_range(start_date, end_date):
return [start_date + timedelta(n)
for n in range(int((end_date - start_date).days) + 1)]
def upload_profile_log(self, job_id, storage: Storage, file_path):
access = storage.access
prefix = access.prefix
bucket_name = access.bucket_name
if prefix:
s3_folder_path = os.path.join(prefix, job_id)
else:
s3_folder_path = job_id
s3_file_key = os.path.join(s3_folder_path, file_path.split('/')[-1])
with open(file_path, 'r') as f:
body = f.read()
self.s3_client.put_object(
bucket_name=bucket_name,
object_name=s3_file_key,
body=body
)