def start()

in pipe-cli/mount/pipe-fuse.py [0:0]


def start(mountpoint, webdav, bucket,
          read_buffer_size, read_ahead_min_size, read_ahead_max_size, read_ahead_size_multiplier,
          read_disk_buffer_path, read_disk_buffer_read_ahead_size, read_disk_buffer_ttl, read_disk_buffer_ttl_delay,
          write_buffer_size, trunc_buffer_size, chunk_size,
          listing_cache_ttl, listing_cache_size,
          xattrs_include_prefixes, xattrs_exclude_prefixes,
          xattrs_cache_ttl, xattrs_cache_size,
          disabled_operations, default_mode,
          mount_options, threads, monitoring_delay, recording,
          show_archived, storage_class_exclude,
          audit_buffer_ttl, audit_buffer_size, fix_permissions):
    try:
        os.makedirs(mountpoint)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise

    api = os.getenv('API', '')
    bearer = os.getenv('API_TOKEN', '')
    chunk_size = int(os.getenv('CP_PIPE_FUSE_CHUNK_SIZE', chunk_size))
    read_ahead_min_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MIN_SIZE', read_ahead_min_size))
    read_ahead_max_size = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_MAX_SIZE', read_ahead_max_size))
    read_ahead_size_multiplier = int(os.getenv('CP_PIPE_FUSE_READ_AHEAD_SIZE_MULTIPLIER',
                                               read_ahead_size_multiplier))
    read_disk_buffer_path = os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_PATH', read_disk_buffer_path)
    read_disk_buffer_read_ahead_size = int(os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_READ_AHEAD_SIZE',
                                                     read_disk_buffer_read_ahead_size))
    read_disk_buffer_ttl = int(os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_TTL', read_disk_buffer_ttl))
    read_disk_buffer_ttl_delay = int(os.getenv('CP_PIPE_FUSE_READ_DISK_BUFFER_TTL_DELAY', read_disk_buffer_ttl_delay))
    audit_buffer_ttl = int(os.getenv('CP_PIPE_FUSE_AUDIT_BUFFER_TTL', audit_buffer_ttl))
    audit_buffer_size = int(os.getenv('CP_PIPE_FUSE_AUDIT_BUFFER_SIZE', audit_buffer_size))
    fs_name = os.getenv('CP_PIPE_FUSE_FS_NAME', 'PIPE_FUSE')
    bucket_type = None
    bucket_path = None
    daemons = []
    if not bearer:
        raise RuntimeError('Cloud Pipeline API_TOKEN should be specified.')
    if webdav:
        import urllib3
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        client = WebDavClient(webdav_url=webdav, bearer=bearer)
        client = ResilientWebDavFileSystemClient(client)
        if fix_permissions:
            client = PermissionAwareWebDavFileSystemClient(client, webdav, bearer)
    else:
        if not api:
            raise RuntimeError('Cloud Pipeline API should be specified.')
        pipe = CloudPipelineClient(api=api, token=bearer)
        bucket_object = pipe.init_bucket_object(bucket)
        bucket_type = bucket_object.type
        bucket_name = bucket_object.root
        bucket_path = '/'.join(bucket_object.path.split('/')[1:])
        if bucket_type == CloudType.S3:
            client = S3StorageLowLevelClient(bucket_name, bucket_object, pipe=pipe, chunk_size=chunk_size)
            if not show_archived:
                client = ArchivedFilesFilterFileSystemClient(client, pipe=pipe, bucket=client.bucket_object)
            client = ArchivedAttributesFileSystemClient(client, pipe=pipe, bucket=client.bucket_object)
        elif bucket_type == CloudType.GS:
            client = GoogleStorageLowLevelFileSystemClient(bucket_name, bucket_object, pipe=pipe, chunk_size=chunk_size)
        else:
            raise RuntimeError('Cloud storage type %s is not supported.' % bucket_object.type)
        if audit_buffer_ttl > 0:
            logging.info('Auditing is enabled.')
            client, daemon = get_audit_client(client, pipe, bucket_object, audit_buffer_ttl, audit_buffer_size)
            daemons.append(daemon)
        else:
            logging.info('Auditing is disabled.')
        client = StorageHighLevelFileSystemClient(client)
    if storage_class_exclude:
        client = StorageClassFilterFileSystemClient(client, classes=storage_class_exclude)
    if recording:
        client = RecordingFileSystemClient(client)
    if bucket_type in [CloudType.S3, CloudType.GS]:
        client = PathExpandingStorageFileSystemClient(client, root_path=bucket_path)
    if listing_cache_ttl > 0 and listing_cache_size > 0:
        listing_cache_implementation = TTLCache(maxsize=listing_cache_size, ttl=listing_cache_ttl)
        listing_cache = ListingCache(listing_cache_implementation)
        if threads:
            listing_cache = ThreadSafeListingCache(listing_cache)
        client = CachingListingFileSystemClient(client, listing_cache)
    else:
        logging.info('Listing caching is disabled.')
    if bucket_type == CloudType.S3:
        if xattrs_cache_ttl > 0 and xattrs_cache_size > 0:
            xattrs_cache_implementation = TTLCache(maxsize=xattrs_cache_size, ttl=xattrs_cache_ttl)
            xattrs_cache = ExtendedAttributesCache(xattrs_cache_implementation)
            if threads:
                xattrs_cache = ThreadSafeExtendedAttributesCache(xattrs_cache)
            client = ExtendedAttributesCachingFileSystemClient(client, xattrs_cache)
        else:
            logging.info('Extended attributes caching is disabled.')
    if read_disk_buffer_path:
        logging.info('Disk buffering read is enabled.')
        client = DiskBufferingReadAllFileSystemClient(client,
                                                      read_ahead_size=read_disk_buffer_read_ahead_size,
                                                      path=read_disk_buffer_path)
        if read_disk_buffer_ttl > 0:
            logging.info('Disk buffering read ttl is enabled.')
            daemons.append(DiskBufferTTLDaemon(path=read_disk_buffer_path,
                                               ttl=read_disk_buffer_ttl,
                                               delay=read_disk_buffer_ttl_delay))
        else:
            logging.info('Disk buffering read ttl is not enabled.')
    else:
        logging.info('Disk buffering read is disabled.')
    if read_buffer_size > 0:
        logging.info('Memory buffering read is enabled.')
        client = MemoryBufferingReadAheadFileSystemClient(client,
                                                          read_ahead_min_size=read_ahead_min_size,
                                                          read_ahead_max_size=read_ahead_max_size,
                                                          read_ahead_size_multiplier=read_ahead_size_multiplier,
                                                          capacity=read_buffer_size)
    else:
        logging.info('Memory buffering read is disabled.')
    if write_buffer_size > 0:
        logging.info('Memory buffering write is enabled.')
        client = MemoryBufferingWriteFileSystemClient(client, capacity=write_buffer_size)
    else:
        logging.info('Memory buffering write is disabled.')
    if trunc_buffer_size > 0:
        if webdav:
            client = CopyOnDownTruncateFileSystemClient(client, capacity=trunc_buffer_size)
            client = WriteLastNullOnUpTruncateFileSystemClient(client)
        elif bucket_type == CloudType.S3:
            client = WriteNullsOnUpTruncateFileSystemClient(client, capacity=trunc_buffer_size)
        elif bucket_type == CloudType.GS:
            client = CopyOnDownTruncateFileSystemClient(client, capacity=trunc_buffer_size)
            client = WriteNullsOnUpTruncateFileSystemClient(client, capacity=trunc_buffer_size)
    else:
        logging.info('Truncating support is disabled.')
    if threads:
        logging.info('Threading is enabled.')
    else:
        logging.info('Threading is disabled.')

    fs = PipeFS(client=client, lock=get_lock(threads, monitoring_delay=monitoring_delay), mode=int(default_mode, 8))
    if bucket_type == CloudType.S3:
        if xattrs_include_prefixes:
            if xattrs_include_prefixes[0] == '*':
                logging.info('All extended attributes will be processed.')
            else:
                fs = RestrictingExtendedAttributesFS(fs, include_prefixes=xattrs_include_prefixes)
        if xattrs_exclude_prefixes:
            if xattrs_exclude_prefixes[0] == '*':
                logging.info('All extended attributes operations will be disabled.')
                disabled_operations.extend(_xattrs_operations)
            else:
                fs = RestrictingExtendedAttributesFS(fs, exclude_prefixes=xattrs_exclude_prefixes)
    else:
        logging.info('All extended attributes operations will be disabled.')
        disabled_operations.extend(_xattrs_operations)
    if disabled_operations:
        fs = RestrictingOperationsFS(fs, exclude=disabled_operations)
    fs = ResilientFS(fs)
    if recording:
        fs = RecordingFS(fs)

    logging.info('File system processing chain: \n%s', fs.summary())

    if daemons:
        logging.info('Initiating file system daemons...')
        for daemon in daemons:
            daemon.start()

    logging.info('Initializing file system...')
    enable_additional_operations()
    ro = client.is_read_only() or mount_options.get('ro', False)
    mount_options.pop('ro', None)
    FUSE(fs, mountpoint, nothreads=not threads, foreground=True, ro=ro, fsname=fs_name, **mount_options)