in pipe-cli/mount/pipe-fuse.py [0:0]
def start(mountpoint, webdav, bucket,
read_buffer_size, read_ahead_min_size, read_ahead_max_size, read_ahead_size_multiplier,
read_disk_buffer_path, read_disk_buffer_read_ahead_size, read_disk_buffer_ttl, read_disk_buffer_ttl_delay,
write_buffer_size, trunc_buffer_size, chunk_size,
listing_cache_ttl, listing_cache_size,
xattrs_include_prefixes, xattrs_exclude_prefixes,
xattrs_cache_ttl, xattrs_cache_size,
disabled_operations, default_mode,
mount_options, threads, monitoring_delay, recording,
show_archived, storage_class_exclude,
audit_buffer_ttl, audit_buffer_size, fix_permissions):
try:
os.makedirs(mountpoint)
except OSError as e:
if e.errno != errno.EEXIST:
raise
api = os.getenv('API', '')
bearer = os.getenv('API_TOKEN', '')
chunk_size = int(os.getenv('CP_PIPE_FUSE_CHUNK_SIZE', chunk_size))
read_ahead_min_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MIN_SIZE', read_ahead_min_size))
read_ahead_max_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MAX_SIZE', read_ahead_max_size))
read_ahead_size_multiplier = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_SIZE_MULTIPLIER',
read_ahead_size_multiplier))
read_disk_buffer_path = os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_PATH', read_disk_buffer_path)
read_disk_buffer_read_ahead_size = int(os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_READ_AHEAD_SIZE',
read_disk_buffer_read_ahead_size))
read_disk_buffer_ttl = int(os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_TTL', read_disk_buffer_ttl))
read_disk_buffer_ttl_delay = int(os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_TTL_DELAY', read_disk_buffer_ttl_delay))
audit_buffer_ttl = int(os.getenv('CP_PIPE_FUSE_AUDIT_BUFFER_TTL', audit_buffer_ttl))
audit_buffer_size = int(os.getenv('CP_PIPE_FUSE_AUDIT_BUFFER_SIZE', audit_buffer_size))
fs_name = os.getenv('CP_PIPE_FUSE_FS_NAME', 'PIPE_FUSE')
bucket_type = None
bucket_path = None
daemons = []
if not bearer:
raise RuntimeError('Cloud Pipeline API_TOKEN should be specified.')
if webdav:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
client = WebDavClient(webdav_url=webdav, bearer=bearer)
client = ResilientWebDavFileSystemClient(client)
if fix_permissions:
client = PermissionAwareWebDavFileSystemClient(client, webdav, bearer)
else:
if not api:
raise RuntimeError('Cloud Pipeline API should be specified.')
pipe = CloudPipelineClient(api=api, token=bearer)
bucket_object = pipe.init_bucket_object(bucket)
bucket_type = bucket_object.type
bucket_name = bucket_object.root
bucket_path = '/'.join(bucket_object.path.split('/')[1:])
if bucket_type == CloudType.S3:
client = S3StorageLowLevelClient(bucket_name, bucket_object, pipe=pipe, chunk_size=chunk_size)
if not show_archived:
client = ArchivedFilesFilterFileSystemClient(client, pipe=pipe, bucket=client.bucket_object)
client = ArchivedAttributesFileSystemClient(client, pipe=pipe, bucket=client.bucket_object)
elif bucket_type == CloudType.GS:
client = GoogleStorageLowLevelFileSystemClient(bucket_name, bucket_object, pipe=pipe, chunk_size=chunk_size)
else:
raise RuntimeError('Cloud storage type %s is not supported.' % bucket_object.type)
if audit_buffer_ttl > 0:
logging.info('Auditing is enabled.')
client, daemon = get_audit_client(client, pipe, bucket_object, audit_buffer_ttl, audit_buffer_size)
daemons.append(daemon)
else:
logging.info('Auditing is disabled.')
client = StorageHighLevelFileSystemClient(client)
if storage_class_exclude:
client = StorageClassFilterFileSystemClient(client, classes=storage_class_exclude)
if recording:
client = RecordingFileSystemClient(client)
if bucket_type in [CloudType.S3, CloudType.GS]:
client = PathExpandingStorageFileSystemClient(client, root_path=bucket_path)
if listing_cache_ttl > 0 and listing_cache_size > 0:
listing_cache_implementation = TTLCache(maxsize=listing_cache_size, ttl=listing_cache_ttl)
listing_cache = ListingCache(listing_cache_implementation)
if threads:
listing_cache = ThreadSafeListingCache(listing_cache)
client = CachingListingFileSystemClient(client, listing_cache)
else:
logging.info('Listing caching is disabled.')
if bucket_type == CloudType.S3:
if xattrs_cache_ttl > 0 and xattrs_cache_size > 0:
xattrs_cache_implementation = TTLCache(maxsize=xattrs_cache_size, ttl=xattrs_cache_ttl)
xattrs_cache = ExtendedAttributesCache(xattrs_cache_implementation)
if threads:
xattrs_cache = ThreadSafeExtendedAttributesCache(xattrs_cache)
client = ExtendedAttributesCachingFileSystemClient(client, xattrs_cache)
else:
logging.info('Extended attributes caching is disabled.')
if read_disk_buffer_path:
logging.info('Disk buffering read is enabled.')
client = DiskBufferingReadAllFileSystemClient(client,
read_ahead_size=read_disk_buffer_read_ahead_size,
path=read_disk_buffer_path)
if read_disk_buffer_ttl > 0:
logging.info('Disk buffering read ttl is enabled.')
daemons.append(DiskBufferTTLDaemon(path=read_disk_buffer_path,
ttl=read_disk_buffer_ttl,
delay=read_disk_buffer_ttl_delay))
else:
logging.info('Disk buffering read ttl is not enabled.')
else:
logging.info('Disk buffering read is disabled.')
if read_buffer_size > 0:
logging.info('Memory buffering read is enabled.')
client = MemoryBufferingReadAheadFileSystemClient(client,
read_ahead_min_size=read_ahead_min_size,
read_ahead_max_size=read_ahead_max_size,
read_ahead_size_multiplier=read_ahead_size_multiplier,
capacity=read_buffer_size)
else:
logging.info('Memory buffering read is disabled.')
if write_buffer_size > 0:
logging.info('Memory buffering write is enabled.')
client = MemoryBufferingWriteFileSystemClient(client, capacity=write_buffer_size)
else:
logging.info('Memory buffering write is disabled.')
if trunc_buffer_size > 0:
if webdav:
client = CopyOnDownTruncateFileSystemClient(client, capacity=trunc_buffer_size)
client = WriteLastNullOnUpTruncateFileSystemClient(client)
elif bucket_type == CloudType.S3:
client = WriteNullsOnUpTruncateFileSystemClient(client, capacity=trunc_buffer_size)
elif bucket_type == CloudType.GS:
client = CopyOnDownTruncateFileSystemClient(client, capacity=trunc_buffer_size)
client = WriteNullsOnUpTruncateFileSystemClient(client, capacity=trunc_buffer_size)
else:
logging.info('Truncating support is disabled.')
if threads:
logging.info('Threading is enabled.')
else:
logging.info('Threading is disabled.')
fs = PipeFS(client=client, lock=get_lock(threads, monitoring_delay=monitoring_delay), mode=int(default_mode, 8))
if bucket_type == CloudType.S3:
if xattrs_include_prefixes:
if xattrs_include_prefixes[0] == '*':
logging.info('All extended attributes will be processed.')
else:
fs = RestrictingExtendedAttributesFS(fs, include_prefixes=xattrs_include_prefixes)
if xattrs_exclude_prefixes:
if xattrs_exclude_prefixes[0] == '*':
logging.info('All extended attributes operations will be disabled.')
disabled_operations.extend(_xattrs_operations)
else:
fs = RestrictingExtendedAttributesFS(fs, exclude_prefixes=xattrs_exclude_prefixes)
else:
logging.info('All extended attributes operations will be disabled.')
disabled_operations.extend(_xattrs_operations)
if disabled_operations:
fs = RestrictingOperationsFS(fs, exclude=disabled_operations)
fs = ResilientFS(fs)
if recording:
fs = RecordingFS(fs)
logging.info('File system processing chain: \n%s', fs.summary())
if daemons:
logging.info('Initiating file system daemons...')
for daemon in daemons:
daemon.start()
logging.info('Initializing file system...')
enable_additional_operations()
ro = client.is_read_only() or mount_options.get('ro', False)
mount_options.pop('ro', None)
FUSE(fs, mountpoint, nothreads=not threads, foreground=True, ro=ro, fsname=fs_name, **mount_options)