in workflows/pipe-common/scripts/mount_storage.py [0:0]
def run(self, mount_root, tmp_dir):
try:
Logger.info('Starting mounting remote data storages.', task_name=self.task_name)
self.wait_before_mount()
if not self.region_id:
Logger.warn('Cannot filter storages by region because cloud region id is not configured. '
'No file storages will be mounted and object storages from all regions will be mounted.',
task_name=self.task_name)
if self.run_id:
Logger.info('Fetching run info...', task_name=self.task_name)
run = self.api.load_run(self.run_id)
else:
Logger.warn('Cannot load run info because run id is not configured.', task_name=self.task_name)
run = None
Logger.info('Fetching list of allowed storages...', task_name=self.task_name)
available_storages_with_mounts = self.api.load_available_storages_with_share_mount(self.region_id or None)
# filtering out shared storage folders, as they cause "overlapped" mounts
# and break the "original" storage mountpoint,
# storage is shared folder of another storage if it has source_storage_id
available_storages_with_mounts = [
x for x in available_storages_with_mounts if not x.storage.source_storage_id
]
storages_ids_by_path = {x.storage.path: x.storage.id for x in available_storages_with_mounts}
additional_mount_options = dict(self._load_mount_options_from_environ())
# filtering out all nfs storages if region id is missing
if not self.region_id:
available_storages_with_mounts = [x for x in available_storages_with_mounts if x.storage.storage_type != NFS_TYPE]
# Backup a full filtered list, before removing storage to be skipped
# We can further refer to this list, if any storage detail is required
# Otherwise we need to call API once again
self.available_storages = list(available_storages_with_mounts)
# Filter out storages, which are requested to be skipped
# NOTE: Any storage, defined by CP_CAP_FORCE_MOUNTS will still be mounted
skip_storages = os.getenv('CP_CAP_SKIP_MOUNTS')
if skip_storages:
Logger.info('Storage(s) "{}" requested to be skipped'.format(skip_storages), task_name=self.task_name)
skip_storages_list = self.parse_storage_list(skip_storages, storages_ids_by_path)
available_storages_with_mounts = [x for x in available_storages_with_mounts if x.storage.id not in skip_storages_list ]
# If the storages are limited by the user - we make sure that the "forced" storages are still available
# This is useful for the tools, which require "databases" or other data from the File/Object storages
force_storages = os.getenv('CP_CAP_FORCE_MOUNTS')
force_storages_list = []
if force_storages:
Logger.info('Storage(s) "{}" forced to be mounted even if the storage mounts list is limited'.format(force_storages), task_name=self.task_name)
force_storages_list = self.parse_storage_list(force_storages, storages_ids_by_path)
limited_storages = os.getenv('CP_CAP_LIMIT_MOUNTS')
if limited_storages:
# Append "forced" storage to the "limited" list, if it's set
if force_storages:
force_storages = ','.join([str(x) for x in force_storages_list])
limited_storages = ','.join([limited_storages, force_storages])
try:
limited_storages_list = []
if limited_storages.lower() != MOUNT_LIMITS_NONE:
limited_storages_list = self.parse_storage_list(limited_storages, storages_ids_by_path)
# Remove duplicates from the `limited_storages_list`, as they can be introduced by `force_storages` or a user's typo
limited_storages_list = list(set(limited_storages_list))
available_storages_with_mounts = [x for x in available_storages_with_mounts if x.storage.id in limited_storages_list]
# append sensitive storages since they are not returned in common mounts
loaded_mounts = {}
for storage_id in limited_storages_list:
storage = self.api.find_datastorage(str(storage_id))
if storage.sensitive:
available_storages_with_mounts.append(
DataStorageWithShareMount(storage, self._get_storage_mount(storage, loaded_mounts)))
Logger.info('Run is launched with mount limits ({}) Only {} storages will be mounted'.format(
limited_storages, len(available_storages_with_mounts)), task_name=self.task_name)
except Exception as limited_storages_ex:
Logger.warn('Unable to parse CP_CAP_LIMIT_MOUNTS value({}) with error: {}.'.format(limited_storages, str(limited_storages_ex)), task_name=self.task_name)
traceback.print_exc()
if not available_storages_with_mounts:
Logger.success('No remote storages are available or CP_CAP_LIMIT_MOUNTS configured to none', task_name=self.task_name)
return
Logger.info('Found {} available storage(s). Checking mount options.'.format(len(available_storages_with_mounts)), task_name=self.task_name)
sensitive_policy = None
sensitive_policy_preference = self.api.get_preference(SENSITIVE_POLICY_PREFERENCE)
if sensitive_policy_preference and 'value' in sensitive_policy_preference:
sensitive_policy = sensitive_policy_preference['value']
for mounter in [mounter for mounter in self.mounters.values()]:
storage_count_by_type = len(list(filter((lambda dsm: dsm.storage.storage_type == mounter.type()), available_storages_with_mounts)))
if storage_count_by_type > 0:
mounter.check_or_install(self.task_name, sensitive_policy)
mounter.init_tmp_dir(tmp_dir, self.task_name)
storages_metadata = self._collect_storages_metadata(available_storages_with_mounts)
if all([not mounter.is_available() for mounter in self.mounters.values()]):
Logger.success('Mounting of remote storages is not available for this image', task_name=self.task_name)
return
initialized_mounters = []
for storage_and_mount in available_storages_with_mounts:
if not PermissionHelper.is_storage_readable(storage_and_mount.storage):
Logger.info('Storage is not readable', task_name=self.task_name)
continue
if PermissionHelper.is_storage_mount_disabled(storage_and_mount.storage):
Logger.info('Storage disabled for mounting, skipping.', task_name=self.task_name)
continue
if not PermissionHelper.is_storage_available_for_mount(storage_and_mount.storage, run):
storage_not_allowed_msg = 'Storage {} is not allowed for {} image'\
.format(storage_and_mount.storage.name, (run or {}).get("actualDockerImage", ""))
if storage_and_mount.storage.id in force_storages_list:
Logger.info(storage_not_allowed_msg + ', but it is forced to be mounted', task_name=self.task_name)
else:
Logger.info(storage_not_allowed_msg + ', skipping it', task_name=self.task_name)
continue
storage_metadata = storages_metadata.get(storage_and_mount.storage.id, {})
mounter = self.mounters[storage_and_mount.storage.storage_type](self.api, storage_and_mount.storage,
storage_metadata,
storage_and_mount.file_share_mount,
sensitive_policy,
additional_mount_options.get(storage_and_mount.storage.id)) \
if storage_and_mount.storage.storage_type in self.mounters else None
if not mounter:
Logger.warn('Unsupported storage type {}.'.format(storage_and_mount.storage.storage_type), task_name=self.task_name)
elif mounter.is_available():
initialized_mounters.append(mounter)
initialized_mounters.sort(key=lambda mnt: mnt.build_mount_point(mount_root))
failed_storages = []
for mnt in initialized_mounters:
try:
mnt.mount(mount_root, self.task_name)
except RuntimeError:
Logger.warn('Data storage {} mounting has failed: {}'
.format(mnt.storage.name, traceback.format_exc()),
task_name=self.task_name)
failed_storages.append(mnt.storage.name)
if failed_storages:
Logger.fail('The following data storages have not been mounted: {}'.format(', '.join(failed_storages)),
task_name=self.task_name)
exit(1)
Logger.success('Finished data storage mounting', task_name=self.task_name)
except Exception:
Logger.fail('Unhandled error during mount task: {}.'.format(traceback.format_exc()),
task_name=self.task_name)
exit(1)