pipe-cli/mount/pipefuse/pipefs.py (325 lines of code) (raw):
# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import io
import logging
import os
import platform
import stat
from threading import RLock
import datetime
import errno
import time
from botocore.exceptions import ClientError
from dateutil.tz import tzlocal
from fuse import FuseOSError, Operations
from pipefuse import fuseutils
from pipefuse.chain import ChainingService
from pipefuse.fsclient import FileSystemOperationException, \
UnsupportedOperationException, ForbiddenOperationException, \
NotFoundOperationException, NoDataOperationException, InvalidOperationException
class FileHandleContainer(object):
FH_START = 2
FH_END = 10000
def __init__(self):
self._container = set()
self._range = fuseutils.lazy_range(self.FH_START, self.FH_END)
self._fh_lock = RLock()
def get(self):
try:
self._fh_lock.acquire()
for fh in self._range:
if fh not in self._container:
self._container.add(fh)
return fh
finally:
self._fh_lock.release()
def release(self, fh):
try:
self._fh_lock.acquire()
if fh in self._container:
self._container.remove(fh)
finally:
self._fh_lock.release()
def syncronized(func):
def wrapper(*args, **kwargs):
lock = args[0]._lock
path = args[1]
try:
lock.lock(path)
return_value = func(*args, **kwargs)
return return_value
finally:
lock.unlock(path)
return wrapper
def errorlogged(func):
def wrapper(*args, **kwargs):
path = args[1]
try:
return func(*args, **kwargs)
except FileSystemOperationException:
raise
except Exception:
logging.exception('Error occurred while %s for %s', func.__name__, path)
raise
return wrapper
class PipeFS(Operations, ChainingService):
def __init__(self, client, lock, mode=0o755):
self._client = client
if not self._client.is_available():
raise RuntimeError('File system server is not available.')
self._container = FileHandleContainer()
self._mode = mode
self._delimiter = '/'
self._root = '/'
self._is_mac = platform.system() == 'Darwin'
self._is_win = platform.system() == 'Windows'
self._gid = 1 if self._is_win else os.getgid()
self._uid = 1 if self._is_win else os.getuid()
self._lock = lock
def _is_skipped_mac_files(self, path):
if not self._is_mac:
return False
filename = os.path.basename(path)
return filename in ['.DS_Store', '.localized'] or filename.startswith('._')
# Filesystem methods
# ==================
@errorlogged
def win_get_attributes(self, path):
# See https://docs.microsoft.com/en-us/windows/win32/fileio/file-attribute-constants
attrs = self.getattr(path)
return 0x10 if attrs['st_mode'] & stat.S_IFDIR == stat.S_IFDIR else 0
def win_set_attributes(self, path, attrs, fh=None):
pass
def win_set_times(self, path, creation_time, last_access_time, last_write_time, fh=None):
pass
@errorlogged
def access(self, path, mode):
if path == self._root:
return
if self._is_skipped_mac_files(path) or not self._client.exists(path):
raise ForbiddenOperationException()
def chmod(self, path, mode):
pass
def chown(self, path, uid, gid):
pass
@errorlogged
def getattr(self, path, fh=None):
if self._is_skipped_mac_files(path):
raise NotFoundOperationException()
props = self._client.attrs(path)
if not props:
raise NotFoundOperationException()
if path == self._root or props.is_dir:
mode = stat.S_IFDIR
else:
mode = stat.S_IFREG
attrs = {
'st_size': props.size,
'st_nlink': 1,
'st_mode': mode | self._mode,
'st_gid': self._gid,
'st_uid': self._uid,
'st_atime': time.mktime(datetime.datetime.now(tz=tzlocal()).timetuple())
}
if props.mtime:
attrs['st_mtime'] = props.mtime
if props.ctime:
attrs['st_ctime'] = props.ctime
return attrs
@errorlogged
def readdir(self, path, fh):
dirents = ['.', '..']
prefix = fuseutils.append_delimiter(path, self._delimiter)
for f in self._client.ls(prefix):
f_name = f.name.rstrip(self._delimiter)
if self._is_skipped_mac_files(f_name) or f_name == '.DS_Store':
continue
if f_name:
dirents.append(f_name)
for f in dirents:
yield f
def readlink(self, path):
raise UnsupportedOperationException()
def mknod(self, path, mode, dev):
raise UnsupportedOperationException()
@syncronized
@errorlogged
def rmdir(self, path):
self._client.rmdir(path)
@syncronized
@errorlogged
def mkdir(self, path, mode):
self._client.mkdir(path)
@errorlogged
def statfs(self, path):
BLOCK_SIZE = 4096
# Report 1 Petabyte as a total volume
BLOCK_TOTAL = int(1 * 1024 * 1024 * 1024 * 1024 * 1024 / BLOCK_SIZE)
BLOCK_AVAIL = BLOCK_TOTAL - 1
if self._is_win:
return {
'f_bavail': BLOCK_AVAIL,
'f_blocks': BLOCK_TOTAL,
'f_bsize': BLOCK_SIZE,
'f_bfree': BLOCK_AVAIL
}
else:
return {
'f_bavail': BLOCK_AVAIL,
'f_blocks': BLOCK_TOTAL,
'f_bsize': BLOCK_SIZE,
'f_bfree': BLOCK_AVAIL,
'f_frsize': 4096,
'f_namemax': 255
}
@syncronized
@errorlogged
def unlink(self, path):
self._client.delete(path)
def symlink(self, name, target):
raise UnsupportedOperationException()
@syncronized
@errorlogged
def rename(self, old, new):
self._client.mv(old, new)
def link(self, target, name):
raise UnsupportedOperationException()
@errorlogged
def utimens(self, path, times=None):
self._client.utimens(path, times)
# File methods
# ============
@syncronized
@errorlogged
def open(self, path, flags):
if self._client.exists(path):
return self._container.get()
raise NotFoundOperationException()
@syncronized
@errorlogged
def create(self, path, mode, fi=None):
self._client.upload([], path)
return self._container.get()
@syncronized
@errorlogged
def read(self, path, length, offset, fh):
with io.BytesIO() as file_buff:
self._client.download_range(fh, file_buff, path, offset=offset, length=length)
return file_buff.getvalue()
@syncronized
@errorlogged
def write(self, path, buf, offset, fh):
self._client.upload_range(fh, buf, path, offset=offset)
return len(buf)
@syncronized
@errorlogged
def truncate(self, path, length, fh=None):
self._client.truncate(fh, path, length)
@syncronized
@errorlogged
def flush(self, path, fh):
self._client.flush(fh, path)
@syncronized
@errorlogged
def release(self, path, fh):
self._container.release(fh)
@syncronized
@errorlogged
def fsync(self, path, fdatasync, fh):
self._client.flush(fh, path)
@syncronized
@errorlogged
def fallocate(self, path, mode, offset, length, fh):
props = self._client.attrs(path)
if not props:
raise NotFoundOperationException()
if mode:
# See http://man7.org/linux/man-pages/man2/fallocate.2.html.
# https://elixir.bootlin.com/linux/latest/source/include/uapi/linux/falloc.h
logging.warn('Fallocate mode (%s) is not supported yet.' % mode)
if offset + length >= props.size:
self._client.truncate(fh, path, offset + length)
@syncronized
@errorlogged
def setxattr(self, path, name, value, options, *args):
self._client.upload_xattr(path, name, value)
return 0
@errorlogged
def getxattr(self, path, name, *args):
xattrs = self._client.download_xattrs(path) or {}
xattr = xattrs.get(name)
if xattr is None:
raise NoDataOperationException()
return xattr
@errorlogged
def listxattr(self, path):
xattrs = self._client.download_xattrs(path) or {}
return xattrs.keys()
@syncronized
@errorlogged
def removexattr(self, path, name):
self._client.remove_xattr(path, name)
return 0
class ResilientFS(ChainingService):
def __init__(self, inner):
"""
Resilient File System.
It properly handles underlying file system errors.
:param inner: Decorating file system.
"""
self._inner = inner
def __getattr__(self, name):
if not hasattr(self._inner, name):
return None
attr = getattr(self._inner, name)
if not callable(attr):
return attr
return self._wrap(attr)
def __call__(self, name, *args, **kwargs):
if not hasattr(self._inner, name):
return getattr(self, name)(*args, **kwargs)
attr = getattr(self._inner, name)
return self._wrap(attr)(*args, **kwargs)
def _wrap(self, attr):
@functools.wraps(attr)
def _wrapped_attr(*args, **kwargs):
try:
return attr(*args, **kwargs)
except UnsupportedOperationException:
raise FuseOSError(errno.ENOTSUP)
except ForbiddenOperationException:
raise FuseOSError(errno.EACCES)
except NotFoundOperationException:
raise FuseOSError(errno.ENOENT)
except NoDataOperationException:
raise FuseOSError(errno.ENODATA)
except InvalidOperationException:
raise FuseOSError(errno.EINVAL)
except Exception as e:
err_msg = str(e)
if isinstance(e, ClientError) and err_msg and 'InvalidObjectState' in err_msg:
if 'storage class' in err_msg:
logging.exception('Failed to access archived file. This file shall be restored first.')
raise FuseOSError(errno.EACCES)
if 'access tier' in err_msg:
logging.exception('Failed to access archived file. Contact storage owner to restore file.')
raise FuseOSError(errno.EACCES)
logging.exception('Uncaught exception from underlying file system.')
raise FuseOSError(errno.EINVAL)
return _wrapped_attr
class RestrictingOperationsFS(ChainingService):
def __init__(self, inner, exclude):
"""
Restricting operations File System.
It allows only certain operations processing.
:param inner: Decorating file system.
:param exclude: Excluding operations.
"""
self._inner = inner
self._exclude = exclude
def __getattr__(self, name):
if not hasattr(self._inner, name):
return None
attr = getattr(self._inner, name)
if not callable(attr):
return attr
return self._wrap(attr, name=name)
def __call__(self, name, *args, **kwargs):
if not hasattr(self._inner, name):
return getattr(self, name)(*args, **kwargs)
attr = getattr(self._inner, name)
return self._wrap(attr, name=name)(*args, **kwargs)
def _wrap(self, attr, name=None):
@functools.wraps(attr)
def _wrapped_attr(*args, **kwargs):
method_name = name or args[0]
if method_name in self._exclude:
logging.debug('Aborting excluded operation %s processing...', method_name)
raise UnsupportedOperationException()
return attr(*args, **kwargs)
return _wrapped_attr
def parameters(self):
params = {}
if self._exclude:
params['exclude'] = ','.join(self._exclude)
return params