workflows/pipe-common/scripts/mount_storage.py (784 lines of code) (raw):

# Copyright 2017-2021 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # CP_S3_FUSE_STAT_CACHE (default: 1m0s) # CP_S3_FUSE_TYPE_CACHE (default: 1m0s) # CP_S3_FUSE_ENSURE_DISKFREE (default: None) # CP_S3_FUSE_TYPE: goofys/s3fs (default: goofys) import argparse import platform import re import os import time import traceback from abc import ABCMeta, abstractmethod from pipeline import PipelineAPI, Logger, common, DataStorageWithShareMount, AclClass, APIError READ_MASK = 1 WRITE_MASK = 1 << 1 DTS = 'DTS' EXEC_ENVIRONMENT = 'EXEC_ENVIRONMENT' NFS_TYPE = 'NFS' S3_TYPE = 'S3' AZ_TYPE = 'AZ' GCP_TYPE = 'GS' MOUNT_DATA_STORAGES = 'MountDataStorages' S3_SCHEME = 's3://' AZ_SCHEME = 'az://' NFS_SCHEME = 'nfs://' GCP_SCHEME = 'gs://' FUSE_GOOFYS_ID = 'goofys' FUSE_S3FS_ID = 's3fs' FUSE_PIPE_ID = 'pipefuse' FUSE_GCSFUSE_ID = 'gcsfuse' FUSE_NA_ID = None AZURE_PROVIDER = 'AZURE' S3_PROVIDER = 'S3' READ_ONLY_MOUNT_OPT = 'ro' MOUNT_LIMITS_NONE = 'none' MOUNT_LIMITS_USER_DEFAULT = 'user_default' MOUNT_LIMITS_PARENT_FOLDER_ID = 'folder:' SENSITIVE_POLICY_PREFERENCE = 'storage.mounts.nfs.sensitive.policy' STORAGE_MOUNT_OPTIONS_ENV_PREFIX = 'CP_CAP_MOUNT_OPTIONS_' STORAGE_MOUNT_PATH_ENV_PREFIX = 'CP_CAP_MOUNT_PATH_' class MountOptions: def __init__(self, mount_params, mount_path): self.mount_params = mount_params self.mount_path = mount_path class PermissionHelper: def __init__(self): pass @classmethod def is_storage_readable(cls, storage): return cls.is_permission_set(storage, READ_MASK) @classmethod def is_storage_mount_disabled(cls, storage): return storage.mount_disabled is True # Checks that tool with its version for the current run is allowed to mount this storage # or there is no configured restriction for this storage @classmethod def is_storage_available_for_mount(cls, storage, run): if not storage.tools_to_mount: return True if not run or not run.get("actualDockerImage"): return False tool = run["actualDockerImage"] re_result = re.search(r"([^/]+)/([^:]+):?(:?.*)", tool) registry, image, version = re_result.groups() for tool_to_mount in storage.tools_to_mount: if registry == tool_to_mount["registry"] and image == tool_to_mount["image"]: tool_versions_to_mount = tool_to_mount.get('versions', []) return not tool_versions_to_mount or version in [v["version"] for v in tool_versions_to_mount] return False @classmethod def is_storage_writable(cls, storage): write_permission_granted = cls.is_permission_set(storage, WRITE_MASK) if not cls.is_run_sensitive(): return write_permission_granted if storage.sensitive: return write_permission_granted else: return False @classmethod def is_permission_set(cls, storage, mask): return storage.mask & mask == mask @classmethod def is_run_sensitive(cls): sensitive_run = os.getenv('CP_SENSITIVE_RUN', "false") return sensitive_run.lower() == 'true' class MountStorageTask: def __init__(self, task): self.api = PipelineAPI(os.environ['API'], 'logs') self.run_id = int(os.getenv('RUN_ID', 0)) self.region_id = int(os.getenv('CLOUD_REGION_ID', 0)) self.task_name = task self.available_storages = None if platform.system() == 'Windows': available_mounters = [S3Mounter, GCPMounter] else: available_mounters = [NFSMounter, S3Mounter, AzureMounter, GCPMounter] self.mounters = {mounter.type(): mounter for mounter in available_mounters} def parse_storage(self, placeholder, available_storages): storage_id = None try: if placeholder.lower() == MOUNT_LIMITS_USER_DEFAULT: user_info = self.api.load_current_user() if 'defaultStorageId' in user_info: storage_id = int(user_info['defaultStorageId']) Logger.info('User default storage is parsed as {}'.format(str(storage_id)), task_name=self.task_name) elif placeholder.lower() == MOUNT_LIMITS_NONE: Logger.info('{} placeholder found while parsing storage id, skipping it'.format(MOUNT_LIMITS_NONE), task_name=self.task_name) elif placeholder.lower().startswith(MOUNT_LIMITS_PARENT_FOLDER_ID): folder_parts = placeholder.split(':') parent_folder_id = 0 if len(folder_parts) == 2: try: parent_folder_id = int(folder_parts[1]) except: Logger.warn('Unable to parse {} placeholder to a parent folder ID, cannot convert it to integer.'.format(placeholder), task_name=self.task_name) if parent_folder_id != 0 and self.available_storages: storage_id = [ x.storage.id for x in self.available_storages if x.storage.parentId == parent_folder_id ] if len(storage_id) == 0: storage_id = None else: Logger.info('Parent folder ID {} is parsed into {} storage IDs'.format(placeholder, storage_id), task_name=self.task_name) else: Logger.warn('Unable to parse {} placeholder to a parent folder ID, it is malformed.'.format(placeholder), task_name=self.task_name) else: storage_identifier = placeholder.strip() if storage_identifier.isdigit(): return int(storage_identifier) if available_storages and available_storages.get(storage_identifier): return int(available_storages.get(storage_identifier)) except Exception as parse_storage_ex: Logger.warn('Unable to parse {} placeholder to a storage ID: {}.'.format(placeholder, str(parse_storage_ex)), task_name=self.task_name) return storage_id def parse_storage_list(self, csv_storages, available_storages): result = [] for item in csv_storages.split(','): storage_id = self.parse_storage(item, available_storages) if storage_id: if type(storage_id) is list: result = result + storage_id else: result.append(storage_id) return result # Any conditions to wait for before starting the mount procedure def wait_before_mount(self): try: wait_before_mount_attempts = int(os.getenv('CP_SENSITIVE_RUN_WAIT_POD_IP_SEC', 10)) wait_before_mount_timeout_sec = 3 # 1. If it's a sensitive job - we shall be sure that a "Run" has a Pod IP assigned. # Otherwise we won't be able to mount sensitive storages. API will consider this job as "outside of the sensitive context" is_sensitive_job = os.getenv('CP_SENSITIVE_RUN') if is_sensitive_job == 'true': Logger.info('A sensitive job detected, will wait for the Pod IP assignment', task_name=self.task_name) current_wait_iteration = 1 while current_wait_iteration <= wait_before_mount_attempts: current_run = self.api.load_run(self.run_id) if not current_run: Logger.warn('Cannot load run info, while waiting for the sensitive pod IP assignment. Will not wait anymore', task_name=self.task_name) break else: if 'podIP' in current_run and current_run['podIP'] != '' and current_run['podIP'] != None: Logger.info('Pod IP is assigned, proceeding further', task_name=self.task_name) break Logger.info('Pod IP is NOT available yet, waiting...', task_name=self.task_name) current_wait_iteration = current_wait_iteration + 1 time.sleep(wait_before_mount_timeout_sec) # 2. ... Add more conditions here ... # ... except Exception as e: Logger.fail('An error occured while waiting for the mounts prerequisites: {}.'.format(str(e.message)), task_name=self.task_name) def run(self, mount_root, tmp_dir): try: Logger.info('Starting mounting remote data storages.', task_name=self.task_name) self.wait_before_mount() if not self.region_id: Logger.warn('Cannot filter storages by region because cloud region id is not configured. ' 'No file storages will be mounted and object storages from all regions will be mounted.', task_name=self.task_name) if self.run_id: Logger.info('Fetching run info...', task_name=self.task_name) run = self.api.load_run(self.run_id) else: Logger.warn('Cannot load run info because run id is not configured.', task_name=self.task_name) run = None Logger.info('Fetching list of allowed storages...', task_name=self.task_name) available_storages_with_mounts = self.api.load_available_storages_with_share_mount(self.region_id or None) # filtering out shared storage folders, as they cause "overlapped" mounts # and break the "original" storage mountpoint, # storage is shared folder of another storage if it has source_storage_id available_storages_with_mounts = [ x for x in available_storages_with_mounts if not x.storage.source_storage_id ] storages_ids_by_path = {x.storage.path: x.storage.id for x in available_storages_with_mounts} additional_mount_options = dict(self._load_mount_options_from_environ()) # filtering out all nfs storages if region id is missing if not self.region_id: available_storages_with_mounts = [x for x in available_storages_with_mounts if x.storage.storage_type != NFS_TYPE] # Backup a full filtered list, before removing storage to be skipped # We can further refer to this list, if any storage detail is required # Otherwise we need to call API once again self.available_storages = list(available_storages_with_mounts) # Filter out storages, which are requested to be skipped # NOTE: Any storage, defined by CP_CAP_FORCE_MOUNTS will still be mounted skip_storages = os.getenv('CP_CAP_SKIP_MOUNTS') if skip_storages: Logger.info('Storage(s) "{}" requested to be skipped'.format(skip_storages), task_name=self.task_name) skip_storages_list = self.parse_storage_list(skip_storages, storages_ids_by_path) available_storages_with_mounts = [x for x in available_storages_with_mounts if x.storage.id not in skip_storages_list ] # If the storages are limited by the user - we make sure that the "forced" storages are still available # This is useful for the tools, which require "databases" or other data from the File/Object storages force_storages = os.getenv('CP_CAP_FORCE_MOUNTS') force_storages_list = [] if force_storages: Logger.info('Storage(s) "{}" forced to be mounted even if the storage mounts list is limited'.format(force_storages), task_name=self.task_name) force_storages_list = self.parse_storage_list(force_storages, storages_ids_by_path) limited_storages = os.getenv('CP_CAP_LIMIT_MOUNTS') if limited_storages: # Append "forced" storage to the "limited" list, if it's set if force_storages: force_storages = ','.join([str(x) for x in force_storages_list]) limited_storages = ','.join([limited_storages, force_storages]) try: limited_storages_list = [] if limited_storages.lower() != MOUNT_LIMITS_NONE: limited_storages_list = self.parse_storage_list(limited_storages, storages_ids_by_path) # Remove duplicates from the `limited_storages_list`, as they can be introduced by `force_storages` or a user's typo limited_storages_list = list(set(limited_storages_list)) available_storages_with_mounts = [x for x in available_storages_with_mounts if x.storage.id in limited_storages_list] # append sensitive storages since they are not returned in common mounts loaded_mounts = {} for storage_id in limited_storages_list: storage = self.api.find_datastorage(str(storage_id)) if storage.sensitive: available_storages_with_mounts.append( DataStorageWithShareMount(storage, self._get_storage_mount(storage, loaded_mounts))) Logger.info('Run is launched with mount limits ({}) Only {} storages will be mounted'.format( limited_storages, len(available_storages_with_mounts)), task_name=self.task_name) except Exception as limited_storages_ex: Logger.warn('Unable to parse CP_CAP_LIMIT_MOUNTS value({}) with error: {}.'.format(limited_storages, str(limited_storages_ex)), task_name=self.task_name) traceback.print_exc() if not available_storages_with_mounts: Logger.success('No remote storages are available or CP_CAP_LIMIT_MOUNTS configured to none', task_name=self.task_name) return Logger.info('Found {} available storage(s). Checking mount options.'.format(len(available_storages_with_mounts)), task_name=self.task_name) sensitive_policy = None sensitive_policy_preference = self.api.get_preference(SENSITIVE_POLICY_PREFERENCE) if sensitive_policy_preference and 'value' in sensitive_policy_preference: sensitive_policy = sensitive_policy_preference['value'] for mounter in [mounter for mounter in self.mounters.values()]: storage_count_by_type = len(list(filter((lambda dsm: dsm.storage.storage_type == mounter.type()), available_storages_with_mounts))) if storage_count_by_type > 0: mounter.check_or_install(self.task_name, sensitive_policy) mounter.init_tmp_dir(tmp_dir, self.task_name) storages_metadata = self._collect_storages_metadata(available_storages_with_mounts) if all([not mounter.is_available() for mounter in self.mounters.values()]): Logger.success('Mounting of remote storages is not available for this image', task_name=self.task_name) return initialized_mounters = [] for storage_and_mount in available_storages_with_mounts: if not PermissionHelper.is_storage_readable(storage_and_mount.storage): Logger.info('Storage is not readable', task_name=self.task_name) continue if PermissionHelper.is_storage_mount_disabled(storage_and_mount.storage): Logger.info('Storage disabled for mounting, skipping.', task_name=self.task_name) continue if not PermissionHelper.is_storage_available_for_mount(storage_and_mount.storage, run): storage_not_allowed_msg = 'Storage {} is not allowed for {} image'\ .format(storage_and_mount.storage.name, (run or {}).get("actualDockerImage", "")) if storage_and_mount.storage.id in force_storages_list: Logger.info(storage_not_allowed_msg + ', but it is forced to be mounted', task_name=self.task_name) else: Logger.info(storage_not_allowed_msg + ', skipping it', task_name=self.task_name) continue storage_metadata = storages_metadata.get(storage_and_mount.storage.id, {}) mounter = self.mounters[storage_and_mount.storage.storage_type](self.api, storage_and_mount.storage, storage_metadata, storage_and_mount.file_share_mount, sensitive_policy, additional_mount_options.get(storage_and_mount.storage.id)) \ if storage_and_mount.storage.storage_type in self.mounters else None if not mounter: Logger.warn('Unsupported storage type {}.'.format(storage_and_mount.storage.storage_type), task_name=self.task_name) elif mounter.is_available(): initialized_mounters.append(mounter) initialized_mounters.sort(key=lambda mnt: mnt.build_mount_point(mount_root)) failed_storages = [] for mnt in initialized_mounters: try: mnt.mount(mount_root, self.task_name) except RuntimeError: Logger.warn('Data storage {} mounting has failed: {}' .format(mnt.storage.name, traceback.format_exc()), task_name=self.task_name) failed_storages.append(mnt.storage.name) if failed_storages: Logger.fail('The following data storages have not been mounted: {}'.format(', '.join(failed_storages)), task_name=self.task_name) exit(1) Logger.success('Finished data storage mounting', task_name=self.task_name) except Exception: Logger.fail('Unhandled error during mount task: {}.'.format(traceback.format_exc()), task_name=self.task_name) exit(1) def _get_storage_mount(self, storage, loaded_mounts): if storage.file_share_mount_id is None: return None mount_id = int(storage.file_share_mount_id) if mount_id in loaded_mounts: return loaded_mounts[mount_id] file_share_mount = self.api.load_file_share_mount(mount_id) loaded_mounts[mount_id] = file_share_mount return file_share_mount def _load_mount_options_from_environ(self): result = {} for env_name, env_value in os.environ.items(): if env_name.startswith(STORAGE_MOUNT_OPTIONS_ENV_PREFIX) and not env_name.endswith('_PARAM_TYPE'): storage_id = env_name[len(STORAGE_MOUNT_OPTIONS_ENV_PREFIX):] if storage_id.isdigit(): storage_id = int(storage_id) if storage_id not in result: result[storage_id] = MountOptions(env_value, os.environ.get(STORAGE_MOUNT_PATH_ENV_PREFIX + str(storage_id), None)) if env_name.startswith(STORAGE_MOUNT_PATH_ENV_PREFIX) and not env_name.endswith('_PARAM_TYPE'): storage_id = env_name[len(STORAGE_MOUNT_PATH_ENV_PREFIX):] if storage_id.isdigit(): storage_id = int(storage_id) if storage_id not in result: result[storage_id] = MountOptions(os.environ.get(STORAGE_MOUNT_OPTIONS_ENV_PREFIX + str(storage_id), None), env_value) return result def _collect_storages_metadata(self, available_storages_with_mounts): storages_metadata_raw = self._load_storages_metadata_raw(available_storages_with_mounts) storages_metadata = dict(self._prepare_storages_metadata(storages_metadata_raw)) return storages_metadata def _load_storages_metadata_raw(self, available_storages_with_mounts): try: storage_ids = [storage_and_mount.storage.id for storage_and_mount in available_storages_with_mounts] return self.api.load_all_metadata_efficiently(storage_ids, AclClass.DATA_STORAGE) except APIError as e: Logger.warn('Storages metadata loading has failed {}.'.format(str(e)), task_name=self.task_name) traceback.print_exc() return [] def _prepare_storages_metadata(self, storages_metadata): for metadata_entry in storages_metadata: storage_id = metadata_entry.get('entity', {}).get('entityId', 0) storage_metadata_raw = metadata_entry.get('data', {}) storage_metadata = { metadata_key: metadata_obj.get('value') for metadata_key, metadata_obj in storage_metadata_raw.items() } yield storage_id, storage_metadata class StorageMounter: __metaclass__ = ABCMeta _cached_regions = [] def __init__(self, api, storage, metadata, share_mount, sensitive_policy, mount_options=None): self.api = api self.storage = storage self.metadata = metadata self.share_mount = share_mount self.sensitive_policy = sensitive_policy self.mount_options = mount_options @staticmethod @abstractmethod def scheme(): pass @staticmethod @abstractmethod def type(): pass @staticmethod @abstractmethod def check_or_install(task_name, sensitive_policy): pass @staticmethod @abstractmethod def is_available(): pass @staticmethod @abstractmethod def init_tmp_dir(tmp_dir, task_name): pass @staticmethod def execute_and_check_command(command, task_name=MOUNT_DATA_STORAGES): install_check, _, stderr = common.execute_cmd_command_and_get_stdout_stderr(command, silent=False) if install_check != 0: Logger.warn('Installation script {command} failed: \n {stderr}'.format(command=command, stderr=stderr), task_name=task_name) return install_check == 0 @staticmethod def create_directory(path, task_name): try: expanded_path = os.path.expandvars(path) if not os.path.exists(expanded_path): os.makedirs(expanded_path) return True except: Logger.warn('Failed to create mount directory: {path}'.format(path=path), task_name=task_name) traceback.print_exc() return False def mount(self, mount_root, task_name): mount_point = self.build_mount_point(mount_root) if not self.create_directory(mount_point, task_name): return params = self.build_mount_params(mount_point) mount_command = self.build_mount_command(params) self.execute_mount(mount_command, params, task_name) def build_mount_point(self, mount_root): mount_point = self.storage.mount_point if self.mount_options and self.mount_options.mount_path: return self.mount_options.mount_path if mount_point is None: mount_point = os.path.join(mount_root, self.get_path()) return mount_point @abstractmethod def build_mount_params(self, mount_point): pass @abstractmethod def build_mount_command(self, params): pass @staticmethod def execute_mount(command, params, task_name): if os.getenv('CP_CAP_MOUNT_SKIP_EXISTING', 'false').lower() == 'true' and not StorageMounter.is_windows(): check_command = "/bin/mount | awk '{{ print $1 \" \" $3 }}' | grep '{path} {mount}'".format(**params) exit_code = common.execute_cmd_command(check_command, executable=None if StorageMounter.is_windows() else '/bin/bash') if exit_code == 0: Logger.info('-->{path} already mounted to {mount}'.format(**params), task_name=task_name) return exit_code = common.execute_cmd_command(command, executable=None if StorageMounter.is_windows() else '/bin/bash') if exit_code == 0: Logger.info('-->{path} mounted to {mount}'.format(**params), task_name=task_name) else: Logger.warn('--> Failed mounting {path} to {mount}'.format(**params), task_name=task_name) raise RuntimeError('Failed mounting {path} to {mount}'.format(**params)) def get_path(self): return self.storage.path.replace(self.scheme(), '', 1) def _get_credentials(self, storage): return self._get_credentials_by_region_id(storage.region_id) def _get_credentials_by_region_id(self, region_id): account_id = os.getenv('CP_ACCOUNT_ID_{}'.format(region_id)) account_key = os.getenv('CP_ACCOUNT_KEY_{}'.format(region_id)) account_region = os.getenv('CP_ACCOUNT_REGION_{}'.format(region_id)) account_token = os.getenv('CP_ACCOUNT_TOKEN_{}'.format(region_id)) if not account_id or not account_key or not account_region: raise RuntimeError('Account information wasn\'t found in the environment for account with id={}.' .format(region_id)) return account_id, account_key, account_region, account_token @staticmethod def is_windows(): return platform.system() == 'Windows' class AzureMounter(StorageMounter): available = False fuse_tmp = '/tmp' @staticmethod def scheme(): return AZ_SCHEME @staticmethod def type(): return AZ_TYPE @staticmethod def check_or_install(task_name, sensitive_policy): AzureMounter.available = StorageMounter.execute_and_check_command('install_azure_fuse_blobfuse', task_name=task_name) @staticmethod def is_available(): return AzureMounter.available @staticmethod def init_tmp_dir(tmp_dir, task_name): fuse_tmp = os.path.join(tmp_dir, "blobfuse") if StorageMounter.create_directory(fuse_tmp, task_name): AzureMounter.fuse_tmp = fuse_tmp def mount(self, mount_root, task_name): self.__resolve_azure_blob_service_url(task_name=task_name) super(AzureMounter, self).mount(mount_root, task_name) def build_mount_params(self, mount_point): account_id, account_key, _, _ = self._get_credentials(self.storage) mount_options = '' if self.mount_options and self.mount_options.mount_params: mount_options = self.mount_options.mount_params elif self.storage.mount_options: mount_options = self.storage.mount_options return { 'mount': mount_point, 'path': self.get_path(), 'tmp_dir': os.path.join(self.fuse_tmp, str(self.storage.id)), 'account_name': account_id, 'account_key': account_key, 'permissions': 'rw' if PermissionHelper.is_storage_writable(self.storage) else 'ro', 'mount_options': mount_options } def build_mount_command(self, params): return 'AZURE_STORAGE_ACCOUNT="{account_name}" ' \ 'AZURE_STORAGE_ACCESS_KEY="{account_key}" ' \ 'blobfuse "{mount}" ' \ '--container-name="{path}" ' \ '--tmp-path="{tmp_dir}" ' \ '-o "{permissions}" ' \ '-o allow_other ' \ '{mount_options}'.format(**params) def __resolve_azure_blob_service_url(self, task_name=MOUNT_DATA_STORAGES): # add resolved ip address for azure blob service to /etc/hosts (only once per account_name) account_name, _, _, _ = self._get_credentials(self.storage) command = 'etc_hosts_clear="$(sed -E \'/.*{account_name}.blob.core.windows.net.*/d\' /etc/hosts)" ' \ '&& cat > /etc/hosts <<< "$etc_hosts_clear" ' \ '&& getent hosts {account_name}.blob.core.windows.net >> /etc/hosts'.format(account_name=account_name) exit_code, _, stderr = common.execute_cmd_command_and_get_stdout_stderr(command, silent=True) if exit_code != 0: Logger.warn('Azure BLOB service hostname resolution and writing to /etc/hosts failed: \n {}'.format(stderr), task_name=task_name) class S3Mounter(StorageMounter): fuse_type = FUSE_NA_ID fuse_tmp = '/tmp' @staticmethod def scheme(): return S3_SCHEME @staticmethod def type(): return S3_TYPE @staticmethod def check_or_install(task_name, sensitive_policy): S3Mounter.fuse_type = S3Mounter._check_or_install(task_name) @staticmethod def _check_or_install(task_name): fuse_type = os.getenv('CP_S3_FUSE_TYPE', FUSE_GOOFYS_ID) if fuse_type == FUSE_GOOFYS_ID: fuse_installed = StorageMounter.execute_and_check_command('install_s3_fuse_goofys', task_name=task_name) return FUSE_GOOFYS_ID if fuse_installed else FUSE_NA_ID elif fuse_type == FUSE_S3FS_ID: fuse_installed = StorageMounter.execute_and_check_command('install_s3_fuse_s3fs', task_name=task_name) if fuse_installed: return FUSE_S3FS_ID else: Logger.warn( "FUSE {fuse_type} was preferred, but failed to install, will try to setup default goofys".format( fuse_type=fuse_type), task_name=task_name) fuse_installed = StorageMounter.execute_and_check_command('install_s3_fuse_goofys', task_name=task_name) return FUSE_GOOFYS_ID if fuse_installed else FUSE_NA_ID elif fuse_type == FUSE_PIPE_ID: return FUSE_PIPE_ID else: Logger.warn("FUSE {fuse_type} type is not defined for S3 fuse".format(fuse_type=fuse_type), task_name=task_name) return FUSE_NA_ID @staticmethod def is_available(): return S3Mounter.fuse_type is not None @staticmethod def init_tmp_dir(tmp_dir, task_name): fuse_tmp = os.path.join(tmp_dir, "s3fuse") if StorageMounter.create_directory(fuse_tmp, task_name): S3Mounter.fuse_tmp = fuse_tmp def build_mount_params(self, mount_point): mask = '0774' permissions = 'rw' stat_cache = os.getenv('CP_S3_FUSE_STAT_CACHE', '1m0s') type_cache = os.getenv('CP_S3_FUSE_TYPE_CACHE', '1m0s') mount_timeout = os.getenv('CP_PIPE_FUSE_MOUNT_TIMEOUT', 10000) aws_key_id, aws_secret, region_name, session_token = self._get_credentials(self.storage) path_chunks = self.storage.path.split('/') bucket = path_chunks[0] relative_path = '/'.join(path_chunks[1:]) if len(path_chunks) > 1 else '' if not PermissionHelper.is_storage_writable(self.storage): mask = '0554' permissions = 'ro' if self.is_windows(): logging_file = os.path.join(os.getenv('LOG_DIR', 'c:\\logs'), 'fuse_{}.log'.format(self.storage.id)) else: logging_file = '/var/log/fuse_{}.log'.format(self.storage.id) return {'mount': mount_point, 'storage_id': str(self.storage.id), 'path': self.get_path(), 'mask': mask, 'permissions': permissions, 'region_name': region_name, 'stat_cache': stat_cache, 'type_cache': type_cache, 'fuse_type': self.fuse_type, 'aws_key_id': aws_key_id, 'aws_secret': aws_secret, 'aws_token': session_token, 'tmp_dir': self.fuse_tmp, 'bucket': bucket, 'relative_path': relative_path, 'mount_timeout': mount_timeout, 'logging_file': logging_file } def remove_prefix(self, text, prefix): if text.startswith(prefix): return text[len(prefix):] return text def build_mount_command(self, params): if params['aws_token'] is not None or params['fuse_type'] == FUSE_PIPE_ID: pipe_mount_options = os.getenv('CP_PIPE_FUSE_MOUNT_OPTIONS') mount_options = self.mount_options.mount_params if self.mount_options and self.mount_options.mount_params \ else os.getenv('CP_PIPE_FUSE_OPTIONS') persist_logs = os.getenv('CP_PIPE_FUSE_PERSIST_LOGS', 'false').lower() == 'true' debug_libfuse = os.getenv('CP_PIPE_FUSE_DEBUG_LIBFUSE', 'false').lower() == 'true' logging_level = os.getenv('CP_PIPE_FUSE_LOGGING_LEVEL') merged_options = '-o allow_other' if debug_libfuse: merged_options = merged_options + ',debug' if mount_options: if mount_options.startswith('-c'): merged_options = merged_options + ' ' + mount_options else: merged_options = merged_options + ',' + self.remove_prefix(mount_options.strip(), '-o').strip() if logging_level: params['logging_level'] = logging_level return ('pipe storage mount {mount} -b {path} -t --mode 775 -w {mount_timeout} ' + ('-l {logging_file} ' if persist_logs else '') + ('-v {logging_level} ' if logging_level else '') + merged_options + ' ' + (pipe_mount_options if pipe_mount_options else '')).format(**params) elif params['fuse_type'] == FUSE_GOOFYS_ID: params['path'] = '{bucket}:{relative_path}'.format(**params) if params['relative_path'] else params['path'] return 'AWS_ACCESS_KEY_ID={aws_key_id} AWS_SECRET_ACCESS_KEY={aws_secret} nohup goofys ' \ '--dir-mode {mask} --file-mode {mask} -o {permissions} -o allow_other ' \ '--stat-cache-ttl {stat_cache} --type-cache-ttl {type_cache} ' \ '--acl "bucket-owner-full-control" ' \ '-f --gid 0 --region "{region_name}" {path} {mount} > {logging_file} 2>&1 &'.format(**params) elif params['fuse_type'] == FUSE_S3FS_ID: params['path'] = '{bucket}:/{relative_path}'.format(**params) if params['relative_path'] else params['path'] ensure_diskfree_size = os.getenv('CP_S3_FUSE_ENSURE_DISKFREE') params["ensure_diskfree_option"] = "" if ensure_diskfree_size is None else "-o ensure_diskfree=" + ensure_diskfree_size return 'AWSACCESSKEYID={aws_key_id} AWSSECRETACCESSKEY={aws_secret} s3fs {path} {mount} -o use_cache={tmp_dir} ' \ '-o umask=0000 -o {permissions} -o allow_other -o enable_noobj_cache ' \ '-o endpoint="{region_name}" -o url="https://s3.{region_name}.amazonaws.com" {ensure_diskfree_option} ' \ '-o default_acl="bucket-owner-full-control" ' \ '-o dbglevel="info" -f > {logging_file} 2>&1 &'.format(**params) else: return 'exit 1' class GCPMounter(StorageMounter): credentials = None fuse_type = FUSE_NA_ID fuse_tmp = '/tmp' @staticmethod def scheme(): return GCP_SCHEME @staticmethod def type(): return GCP_TYPE @staticmethod def check_or_install(task_name, sensitive_policy): GCPMounter.fuse_type = GCPMounter._check_or_install(task_name) @staticmethod def _check_or_install(task_name): fuse_type = os.getenv('CP_GCS_FUSE_TYPE', FUSE_GCSFUSE_ID) if fuse_type == FUSE_GCSFUSE_ID: fuse_installed = StorageMounter.execute_and_check_command('install_gcsfuse', task_name=task_name) return FUSE_GCSFUSE_ID if fuse_installed else FUSE_NA_ID elif fuse_type == FUSE_PIPE_ID: return FUSE_PIPE_ID else: Logger.warn("FUSE {fuse_type} type is not defined for GSC fuse".format(fuse_type=fuse_type), task_name=task_name) return FUSE_NA_ID @staticmethod def is_available(): return GCPMounter.fuse_type is not None @staticmethod def init_tmp_dir(tmp_dir, task_name): fuse_tmp = os.path.join(tmp_dir, "gcsfuse") if StorageMounter.create_directory(fuse_tmp, task_name): GCPMounter.fuse_tmp = fuse_tmp def mount(self, mount_root, task_name): super(GCPMounter, self).mount(mount_root, task_name) def build_mount_params(self, mount_point): mount_timeout = os.getenv('CP_PIPE_FUSE_MOUNT_TIMEOUT', 10000) gcp_creds_content, _ = self._get_credentials(self.storage) if gcp_creds_content: creds_named_pipe_path = "<(echo \'{gcp_creds_content}\')".format(gcp_creds_content=gcp_creds_content) else: creds_named_pipe_path = None mask = '0774' permissions = 'rw' if not PermissionHelper.is_storage_writable(self.storage): mask = '0554' permissions = 'ro' if self.is_windows(): logging_file = os.path.join(os.getenv('LOG_DIR', 'c:\\logs'), 'fuse_{}.log'.format(self.storage.id)) else: logging_file = '/var/log/fuse_{}.log'.format(self.storage.id) return {'mount': mount_point, 'storage_id': str(self.storage.id), 'path': self.get_path(), 'mask': mask, 'permissions': permissions, 'fuse_type': self.fuse_type, 'tmp_dir': self.fuse_tmp, 'credentials': creds_named_pipe_path, 'mount_timeout': mount_timeout, 'logging_file': logging_file } def build_mount_command(self, params): if not params['credentials'] or params['fuse_type'] == FUSE_PIPE_ID: persist_logs = os.getenv('CP_PIPE_FUSE_PERSIST_LOGS', 'false').lower() == 'true' debug_libfuse = os.getenv('CP_PIPE_FUSE_DEBUG_LIBFUSE', 'false').lower() == 'true' logging_level = os.getenv('CP_PIPE_FUSE_LOGGING_LEVEL') if logging_level: params['logging_level'] = logging_level return ('pipe storage mount {mount} -b {path} -t --mode 775 -w {mount_timeout} ' + ('-l {logging_file} ' if persist_logs else '') + ('-v {logging_level} ' if logging_level else '') + ('-o allow_other,debug ' if debug_libfuse else '-o allow_other ') ).format(**params) else: return 'nohup gcsfuse --foreground -o {permissions} -o allow_other --key-file {credentials} --temp-dir {tmp_dir} ' \ '--dir-mode {mask} --file-mode {mask} --implicit-dirs {path} {mount} > {logging_file} 2>&1 &'.format(**params) def _get_credentials(self, storage): account_region = os.getenv('CP_ACCOUNT_REGION_{}'.format(storage.region_id)) account_cred_file_content = os.getenv('CP_CREDENTIALS_FILE_CONTENT_{}'.format(storage.region_id)) if not account_region: raise RuntimeError('Account information wasn\'t found in the environment for account with id={}.' .format(storage.region_id)) return account_cred_file_content, account_region class NFSMounter(StorageMounter): available = False @staticmethod def scheme(): return NFS_SCHEME @staticmethod def type(): return NFS_TYPE @staticmethod def check_or_install(task_name, sensitive_policy): NFSMounter.available = False if PermissionHelper.is_run_sensitive() and sensitive_policy == "SKIP" \ else StorageMounter.execute_and_check_command('install_nfs_client', task_name=task_name) @staticmethod def init_tmp_dir(tmp_dir, task_name): pass @staticmethod def is_available(): return NFSMounter.available def build_mount_point(self, mount_root): if self.mount_options and self.mount_options.mount_path: return self.mount_options.mount_path mount_point = self.storage.mount_point if mount_point is None: # NFS path will look like srv:/some/path. Remove the first ':' from it mount_point = os.path.join(mount_root, self.get_path().replace(':', '', 1)) return mount_point def build_mount_params(self, mount_point): return {'mount': mount_point, 'path': self.get_path()} def build_mount_command(self, params): command = '/bin/mount -t {protocol}' if self.mount_options and self.mount_options.mount_params: mount_options = self.mount_options.mount_params else: mount_options = self.storage.mount_options if self.storage.mount_options else self.share_mount.mount_options region_id = str(self.share_mount.region_id) if self.share_mount.region_id is not None else "" if os.getenv("CP_CLOUD_PROVIDER_" + region_id) == "AZURE" and self.share_mount.mount_type == "SMB": az_acc_id, az_acc_key, _, _ = self._get_credentials_by_region_id(region_id) creds_options = ",".join(["username=" + az_acc_id, "password=" + az_acc_key]) if mount_options: mount_options += "," + creds_options else: mount_options = creds_options if self.share_mount.mount_type == "SMB": command = command.format(protocol="cifs") if not params['path'].startswith("//"): params['path'] = '//' + params['path'] elif self.share_mount.mount_type == "LUSTRE": command = command.format(protocol="lustre") else: command = command.format(protocol="nfs") permission = str(self.metadata.get('chmod', 'g+rwx')) mask = '0774' if not PermissionHelper.is_storage_writable(self.storage): mask = '0554' if not mount_options: mount_options = READ_ONLY_MOUNT_OPT else: options = mount_options.split(',') if READ_ONLY_MOUNT_OPT not in options: mount_options += ',{0}'.format(READ_ONLY_MOUNT_OPT) if self.share_mount.mount_type == "SMB": file_mode_options = 'file_mode={mode},dir_mode={mode}'.format(mode=mask) if not mount_options: mount_options = file_mode_options else: mount_options += ',' + file_mode_options mount_options = self.append_timeout_options(mount_options) if mount_options: command += ' -o {}'.format(mount_options) command += ' {path} {mount}'.format(**params) if PermissionHelper.is_storage_writable(self.storage) and not self.storage.sensitive: command += ' && ( chmod {permission} {mount} || true )'.format(permission=permission, **params) return command def append_timeout_options(self, mount_options): if self.share_mount.mount_type == 'SMB' or not PermissionHelper.is_run_sensitive() \ or self.sensitive_policy != "TIMEOUT": return mount_options if not mount_options or 'retry' not in mount_options: mount_retry = os.getenv('CP_FS_MOUNT_ATTEMPT', 0) retry_option = 'retry={}'.format(mount_retry) mount_options = retry_option if not mount_options else mount_options + ',' + retry_option if self.share_mount.mount_type == 'LUSTRE': return mount_options if not mount_options or 'timeo' not in mount_options: mount_timeo = os.getenv('CP_FS_MOUNT_TIMEOUT', 7) timeo_option = 'timeo={}'.format(mount_timeo) mount_options = timeo_option if not mount_options else mount_options + ',' + timeo_option return mount_options def main(): parser = argparse.ArgumentParser() parser.add_argument('--mount-root', required=True) parser.add_argument('--tmp-dir', required=True) parser.add_argument('--task', required=False, default=MOUNT_DATA_STORAGES) args = parser.parse_args() if EXEC_ENVIRONMENT in os.environ and os.environ[EXEC_ENVIRONMENT] == DTS: Logger.success('Skipping cloud storage mount for execution environment %s' % DTS, task_name=args.task) return MountStorageTask(args.task).run(args.mount_root, args.tmp_dir) if __name__ == '__main__': main()