workflows/pipe-common/scripts/watch_mount_shares.py (647 lines of code) (raw):
# Copyright 2017-2021 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
import datetime
from distutils.spawn import find_executable
import errno
import logging
import os
import psutil
import re
import subprocess
import shutil
import signal
import socket
import time
from pipeline import PipelineAPI
from watchdog.observers.inotify import InotifyObserver
from watchdog.events import FileSystemEventHandler, FileMovedEvent
from watchdog.observers.api import ObservedWatch
from logging.handlers import RotatingFileHandler
NFS_UNMOUNT_CMD_PATTERN = "umount -l -f \"{}\""
NFS_MOUNT_CMD_PATTERN = 'mount -t {} -o \"{}\" \"{}\" \"{}\"'
REMOUNT_OPTION = 'remount'
READ_WRITE_OPTION = 'rw'
READ_ONLY_OPTION = 'ro'
MODIFIED_MNT_SEPARATOR = '|'
LNET_SPLIT = '@tcp:/'
WRITE_MASK = 1 << 1
MOUNT_STATUS_DISABLED = 'MOUNT_DISABLED'
MOUNT_STATUS_READ_ONLY = 'READ_ONLY'
MOUNT_STATUS_ACTIVE = 'ACTIVE'
MOUNT_STATUS_UNKNOWN = 'UNKNOWN'
DEFAULT_MOUNT_STATUS = os.getenv('CP_CAP_NFS_OBSERVER_DEFAULT_STORAGE_STATUS', 'UNKNOWN')
NEWLINE = '\n'
COMMA = ','
MNT_LISTING_COMMAND = 'mount -t {}'
MNT_PARSING_REGEXP = r"(.*) on (.*) type (.*) \((.*)\)"
DT_FORMAT = '%Y-%m-%d %H:%M:%S:%f'
PIPE_CP_TEMPLATE = 'pipe storage cp \'{}\' \'{}\''
AWS_CP_TEMPLATE = 'aws s3 cp \'{}\' \'{}\' --only-show-errors --sse aws:kms'
CREATE_EVENT = 'c'
MODIFY_EVENT = 'm'
MOVED_FROM_EVENT = 'mf'
MOVED_TO_EVENT = 'mt'
DELETE_EVENT = 'd'
WATCHER_MEM_CONSUMPTION_BYTES = 1024
WATCHERS_USAGE_MEMORY_RATE = int(os.getenv('CP_CAP_NFS_MNT_OBSERVER_WATCHERS_MEM_CONSUMPTION_MAX_RATE', 100))
INOTIFY_MAX_WATCHERS = 'fs.inotify.max_user_instances'
INOTIFY_MAX_WATCHED_DIRS = 'fs.inotify.max_user_watches'
INOTIFY_MAX_QUEUED_EVENTS = 'fs.inotify.max_queued_events'
EVENTS_LIMIT = int(os.getenv('CP_CAP_NFS_OBSERVER_EVENTS_LIMIT', 1000))
MNT_RESYNC_TIMEOUT_SEC = int(os.getenv('CP_CAP_NFS_OBSERVER_MNT_RESYNC_TIMEOUT_SEC', 10))
EVENT_DUMPING_TIMEOUT_SEC = int(os.getenv('CP_CAP_NFS_OBSERVER_DUMPING_TIMEOUT_SEC', 30))
TARGET_FS_TYPES = os.getenv('CP_CAP_NFS_OBSERVER_TARGET_FS_TYPES', 'nfs4,lustre')
EVENT_FILES_LIMIT_MB = int(os.getenv('CP_CAP_NFS_OBSERVER_EVENT_FILES_LIMIT_MB', 155))
EVENT_FILES_BACKUP_COUNT = max(1, int(os.getenv('CP_CAP_NFS_OBSERVER_EVENT_FILES_BACKUP_COUNT', 30)))
IGNORED_MOUNTPOINTS = os.getenv('CP_CAP_NFS_OBSERVER_IGNORE_MOUNTPOINTS', '').split(',')
IGNORED_SOURCES = os.getenv('CP_CAP_NFS_OBSERVER_IGNORE_SOURCES', '').split(',')
logging_format = os.getenv('CP_CAP_NFS_OBSERVER__LOGGING_FORMAT', '%(message)s')
logging_level = os.getenv('CP_CAP_NFS_OBSERVER_LOGGING_LEVEL', 'WARNING')
logging.basicConfig(level=logging_level, format=logging_format)
def format_message(message):
return '[{}] {}'.format(current_utc_time_str(), message)
def mkdir(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def configure_kernel_param(key, value):
execute_cmd_command_and_get_stdout_stderr('sysctl -w {}={} && sysctl -p'.format(key, value), silent=True)
def execute_cmd_command_and_get_stdout_stderr(command, silent=False, executable=None):
if executable:
p = subprocess.Popen(command, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, executable=executable)
else:
p = subprocess.Popen(command, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
if not silent and stderr:
print(stderr)
if not silent and stdout:
print(stdout)
return p.wait(), stdout, stderr
def execute_command(command, max_attempts=1):
attempts = 0
success = False
stdout = None
stderr = None
while attempts < max_attempts and not success:
attempts += 1
exit_code, stdout, stderr = execute_cmd_command_and_get_stdout_stderr(command, silent=True)
success = exit_code == 0
if not success:
logging.error(format_message(
'Execution of [{}] failed due to the reason: {}'.format(command, stderr.rstrip(NEWLINE))))
return stdout, success
def current_utc_time():
return datetime.datetime.utcnow()
def current_utc_time_millis():
return int((current_utc_time() - datetime.datetime(1970, 1, 1)).total_seconds() * 1000)
def current_utc_time_str():
return current_utc_time().strftime(DT_FORMAT)
class MountPointDetails:
def __init__(self, mount_source, mount_point, mount_type, mount_attributes):
self.mount_source = mount_source
self.mount_point = mount_point
self.mount_type = mount_type
self.mount_attributes = mount_attributes
@staticmethod
def from_array(array):
if len(array) == 4:
return MountPointDetails(array[0], array[1], array[2], array[3])
else:
return None
class Event:
def __init__(self, path, event_type):
self.timestamp = current_utc_time_millis()
self.path = path
self.event_type = event_type
class CloudBucketDumpingEventHandler(FileSystemEventHandler):
def __init__(self):
super(FileSystemEventHandler, self).__init__()
self._active_events = OrderedDict()
self._target_path_mapping = dict()
self._activity_logging_local_dir = self.configure_logging_local_dir()
self._activity_logging_bucket_dir = self._configure_logging_bucket_dir()
self._transfer_template = self._get_available_transfer_template()
self.last_dump_time = current_utc_time()
self._init_events_logger()
@staticmethod
def _get_available_transfer_template():
if find_executable('pipe'):
transfer_template = PIPE_CP_TEMPLATE
elif find_executable('aws'):
transfer_template = AWS_CP_TEMPLATE
else:
raise RuntimeError(
'Unable to start CloudBucketDumpingEventHandler: no suitable command for transfer available')
logging.info(format_message('[{}] will be used as a transfer template'.format(transfer_template)))
return transfer_template
@staticmethod
def _configure_logging_bucket_dir():
bucket_dir = os.path.join(os.getenv('CP_CAP_NFS_MNT_OBSERVER_TARGET_BUCKET', ''),
CloudBucketDumpingEventHandler._get_service_name())
logging.info(format_message('Destination bucket location is [{}]'.format(bucket_dir)))
return bucket_dir
@staticmethod
def _get_service_name():
service_name = os.getenv('RUN_ID')
if not service_name:
if os.getenv('CP_API_HOME'):
service_name = 'api'
elif os.getenv('CP_DAV_MOUNT_POINT'):
service_name = 'dav'
else:
service_name = 'unknown'
return service_name
@staticmethod
def configure_logging_local_dir():
local_logging_dir = os.path.join(os.getenv('RUN_DIR', '/tmp'), 'fs_watcher')
logging.info(format_message('Local storage directory is [{}]'.format(local_logging_dir)))
mkdir(local_logging_dir)
return local_logging_dir
def _init_events_logger(self):
self._events_local_file = os.path.join(self._activity_logging_local_dir, 'nfs_events')
single_event_file_size_limit_mb = EVENT_FILES_LIMIT_MB / (EVENT_FILES_BACKUP_COUNT + 1)
events_logging_handler = RotatingFileHandler(self._events_local_file, mode='a',
backupCount=EVENT_FILES_BACKUP_COUNT,
maxBytes=single_event_file_size_limit_mb * 1024 * 1024)
events_logging_handler.setFormatter(logging.Formatter('%(message)s'))
self._events_logger = logging.getLogger('nfs_events')
self._events_logger.addHandler(events_logging_handler)
self._events_logger.setLevel(logging.INFO)
self._events_logger.propagate = False
def _convert_event_to_str(self, event):
for mnt_dest, mnt_src in self._target_path_mapping.items():
path = event.path
if path.startswith(mnt_dest):
return COMMA.join([str(event.timestamp),
event.event_type,
mnt_src,
path[len(mnt_dest) + 1:]])
def _insert_event(self, file_path, event_type):
if file_path not in self._active_events:
event_descriptor = Event(file_path, event_type)
else:
last_event = self._active_events[file_path]
last_event_type = last_event.event_type
if last_event_type == CREATE_EVENT:
if event_type == DELETE_EVENT or event_type == MOVED_FROM_EVENT:
del self._active_events[file_path]
return
else:
event_descriptor = Event(file_path, event_type)
self._active_events[file_path] = event_descriptor
def update_target_mounts_mappings(self, new_mappings):
self._target_path_mapping.update(new_mappings)
def dispatch(self, event):
if len(self._active_events) > EVENTS_LIMIT:
self.dump_to_storage()
if not event.is_directory:
if type(event) is FileMovedEvent:
self._insert_event(event.src_path, MOVED_FROM_EVENT)
self._insert_event(event.dest_path, MOVED_TO_EVENT)
# TODO check if filtering could be applied at observer creation
elif event.event_type != 'closed':
self._insert_event(event.src_path, event.event_type[:1])
def check_timeout_and_dump(self):
if (current_utc_time() - self.last_dump_time).total_seconds() > EVENT_DUMPING_TIMEOUT_SEC:
self.dump_to_storage()
def dump_to_storage(self):
if len(self._active_events) != 0:
logging.info(format_message('Saving events to {} '.format(self._events_local_file)))
# TODO sorting might be skipped in case events will be sorted in the consuming service
sorted_events = sorted(self._active_events.values(), key=lambda e: e.timestamp)
for event in sorted_events:
self._events_logger.info(self._convert_event_to_str(event))
logging.info(format_message('Cleaning activity list'))
self._active_events.clear()
for rollover_backup in range(EVENT_FILES_BACKUP_COUNT, 0, -1):
self._dump_event_file('{}.{}'.format(self._events_local_file, rollover_backup))
self._dump_event_file(self._events_local_file)
def _dump_event_file(self, local_event_file):
if not os.path.exists(local_event_file) or os.stat(local_event_file).st_size == 0:
return
bucket_filename = 'events-' + current_utc_time_str().replace(' ', '_')
bucket_file = os.path.join(self._activity_logging_bucket_dir, bucket_filename)
logging.info(format_message('Dumping events to {} '.format(bucket_file)))
_, result = execute_command(self._transfer_template.format(local_event_file, bucket_file))
if result:
self.last_dump_time = current_utc_time()
with open(local_event_file, 'w'):
pass
class NFSMountWatcher:
WATCHER_LIMIT_FILE = '/tmp/.inotify_limit'
def __init__(self, target_paths=None):
self._target_path_mapping = dict()
self._event_handler = CloudBucketDumpingEventHandler()
self._event_observer = InotifyObserver()
self._set_signal_handlers()
self._watchers_limit = self._configure_watchers_limit()
self._terminated = False
if target_paths:
self._target_paths = target_paths.split(COMMA)
self._update_static_paths_mapping()
else:
self._target_paths = None
self._update_target_mount_points()
@staticmethod
def _calculate_max_allowed_watchers():
return psutil.virtual_memory().total / WATCHER_MEM_CONSUMPTION_BYTES / 100 * WATCHERS_USAGE_MEMORY_RATE
@staticmethod
def _get_watchers_limit():
if os.path.exists(NFSMountWatcher.WATCHER_LIMIT_FILE):
logging.info('Reading watchers configuration from the file...')
try:
with open(NFSMountWatcher.WATCHER_LIMIT_FILE, "r") as persistent_conf_file:
content = persistent_conf_file.readlines()
if len(content) > 0:
return int(content[0])
except BaseException as e:
logging.error('Error during limit retrieval from the file: {}'.format(e.message))
logging.info('Reading watchers configuration from the env var...')
return int(os.getenv('CP_CAP_NFS_MNT_OBSERVER_RUN_WATCHERS', 65535))
def _configure_watchers_limit(self):
watchers_limit = self._get_watchers_limit()
logging.info(format_message('Watchers configuration: [{}]'.format(watchers_limit)))
configure_kernel_param(INOTIFY_MAX_WATCHERS, watchers_limit)
configure_kernel_param(INOTIFY_MAX_WATCHED_DIRS, watchers_limit)
configure_kernel_param(INOTIFY_MAX_QUEUED_EVENTS, 2 * watchers_limit)
return watchers_limit
def _set_signal_handlers(self):
for signal_code in [signal.SIGTERM, signal.SIGINT]:
signal.signal(signal_code, self._shutdown_and_exit)
def is_terminated(self):
return self._terminated
def _shutdown_and_exit(self, signal_code, frame):
self.release_resources()
self._terminated = True
exit(0)
def _process_active_target_path(self, mnt_dest, mnt_src):
if mnt_dest not in self._target_path_mapping:
self._add_new_mount_watcher(mnt_dest, mnt_src)
else:
self._update_existing_mount_watcher(mnt_dest, mnt_src)
def _update_static_paths_mapping(self):
logging.info(
format_message('Observing FS events on [{}] target paths specified...'.format(len(self._target_paths))))
active_mounts = self._get_target_mount_points()
for mnt_dest in self._target_paths:
mnt_src = active_mounts.get(mnt_dest, mnt_dest)
self._process_active_target_path(mnt_dest, mnt_src)
self._event_handler.update_target_mounts_mappings(self._target_path_mapping)
def _update_target_mount_points(self):
logging.info(format_message('Checking active mounts...'))
latest_mounts_mapping = self._get_target_mount_points()
logging.info(format_message('Found [{}] active mounts'.format(len(latest_mounts_mapping))))
self._remove_unmounted_watchers(latest_mounts_mapping)
for mnt_dest, mnt_src in latest_mounts_mapping.items():
self._process_active_target_path(mnt_dest, mnt_src)
self._event_handler.update_target_mounts_mappings(self._target_path_mapping)
def _update_existing_mount_watcher(self, mnt_dest, mnt_src):
active_mount_src = self._target_path_mapping[mnt_dest]
if active_mount_src != mnt_src:
logging.warning(format_message(
'Updating observer: [{}] mount-source changed from [{}] to [{}]'.format(mnt_dest,
active_mount_src,
mnt_src)))
self._target_path_mapping[mnt_dest] = mnt_src
def _add_new_mount_watcher(self, mnt_dest, mnt_src):
logging.warning(format_message('Assigning [{}] to the observer'.format(mnt_dest)))
if self.try_to_add_path_to_observer(mnt_dest):
self._target_path_mapping[mnt_dest] = mnt_src
def _remove_unmounted_watchers(self, latest_mounts_mapping):
for mnt_dest, mnt_src in self._target_path_mapping.items():
if mnt_dest not in latest_mounts_mapping:
logging.warning(format_message('Removing observer from [{}]'.format(mnt_dest)))
if self.try_to_remove_path_from_observer(mnt_dest):
self._target_path_mapping.pop(mnt_dest)
def try_to_remove_path_from_observer(self, mnt_dest):
try:
if len(self._target_path_mapping.items()) > 1:
self._event_observer.unschedule(ObservedWatch(mnt_dest, True))
return True
except OSError as e:
logging.error(
format_message('Unable to drop observation on [{}], an error occurred: {}'.format(mnt_dest, e.message)))
return False
def try_to_add_path_to_observer(self, mnt_dest):
if not os.path.exists(mnt_dest):
logging.warning(format_message('Target path [{}] doesn\'t exist, skipping...'.format(mnt_dest)))
return False
try:
self._event_observer.schedule(self._event_handler, mnt_dest, recursive=True)
return True
except OSError as e:
logging.error(format_message('Unable to assign [{}], an error occurred: {}'.format(mnt_dest, e.message)))
return False
@staticmethod
def _get_target_mount_points():
available_storages_dict = NFSMountWatcher._get_available_storages_dict()
is_admin = NFSMountWatcher._is_admin()
out, res = execute_command(MNT_LISTING_COMMAND.format(TARGET_FS_TYPES))
if not res or not out:
logging.info(format_message('Unable to retrieve [{}] mounts'.format(TARGET_FS_TYPES)))
else:
for line in out.split(NEWLINE):
if line:
mnt_details = re.search(MNT_PARSING_REGEXP, line).groups()
if len(mnt_details) == 4:
mount_details = MountPointDetails.from_array(mnt_details)
if NFSMountWatcher._shall_ignore_mountpoint(mount_details):
logging.info(format_message('Ignoring [{}] mount'.format(mount_details.mount_source)))
continue
mount_attributes = mount_details.mount_attributes.split(COMMA)
if READ_WRITE_OPTION in mount_attributes:
NFSMountWatcher.save_details_status_active(mount_details)
else:
NFSMountWatcher.save_details_status_readonly(mount_details)
NFSMountWatcher.move_mount_statuses_from_tmp_file()
mount_points = NFSMountWatcher._process_modified_mounts(available_storages_dict, is_admin)
return mount_points
@staticmethod
def _shall_ignore_mountpoint(mountpoint):
return mountpoint.mount_source in IGNORED_SOURCES or mountpoint.mount_point in IGNORED_MOUNTPOINTS
@staticmethod
def is_permission_set(storage, mask):
return storage.mask & mask == mask
@staticmethod
def _get_mount_status(available_storages_dict, mount_details, is_admin, default):
if is_admin is None:
return MOUNT_STATUS_UNKNOWN
elif is_admin:
return MOUNT_STATUS_ACTIVE
if available_storages_dict is None:
return default
matching_storage = NFSMountWatcher._find_matching_storage(available_storages_dict, mount_details)
if matching_storage:
status = matching_storage.mount_status
if status == MOUNT_STATUS_DISABLED \
or (status == MOUNT_STATUS_ACTIVE
and not NFSMountWatcher.is_permission_set(matching_storage, WRITE_MASK)):
return MOUNT_STATUS_READ_ONLY
else:
return status
else:
return DEFAULT_MOUNT_STATUS
@staticmethod
def _get_available_storages_dict():
api = PipelineAPI(os.getenv('API'), 'logs')
try:
available_storages = api.load_available_storages()
available_storages_dict = {storage.path: storage for storage in available_storages}
return available_storages_dict
except RuntimeError as e:
logging.warning(format_message('Unable to load available storage list: {}'.format(str(e))))
return None
@staticmethod
def _is_admin():
api = PipelineAPI(os.getenv('API'), 'logs')
try:
user = api.load_current_user()
if 'roles' in user and user['roles']:
roles = user['roles']
roles_names = [role['name'] for role in roles]
return 'ROLE_ADMIN' in roles_names
else:
return False
except RuntimeError as e:
logging.warning(format_message('Unable to load current user: {}'.format(str(e))))
return None
@staticmethod
def _process_modified_mounts(available_storages_dict, is_admin):
mount_points = dict()
modified_mounts_file = NFSMountWatcher.get_modified_mounts_file_path()
if not os.path.exists(modified_mounts_file):
logging.info(format_message('No modified mounts to check...'))
return mount_points
with open(modified_mounts_file, "r") as modified_mounts:
lines = modified_mounts.readlines()
if len(lines) < 1:
logging.info(format_message('No modified mounts to check...'))
return mount_points
else:
logging.info(format_message('Processing modified mounts'))
for line in lines:
line = line.rstrip()
if not line:
continue
modified_mount_summary = line.split(MODIFIED_MNT_SEPARATOR)
modified_mount_details = MountPointDetails.from_array(modified_mount_summary[0:-1])
current_mount_status = modified_mount_summary[-1]
new_mount_status = NFSMountWatcher._get_mount_status(available_storages_dict, modified_mount_details,
is_admin, default=current_mount_status)
if new_mount_status:
if current_mount_status == new_mount_status \
or new_mount_status == MOUNT_STATUS_UNKNOWN:
if current_mount_status == MOUNT_STATUS_ACTIVE:
mount_points[modified_mount_details.mount_point] = modified_mount_details.mount_source
NFSMountWatcher.save_mount_details_to_modified_mounts_file(modified_mount_details,
current_mount_status)
else:
NFSMountWatcher._process_modified_mount(current_mount_status, new_mount_status,
modified_mount_details, mount_points)
else:
if current_mount_status == MOUNT_STATUS_ACTIVE:
mount_points[modified_mount_details.mount_point] = modified_mount_details.mount_source
NFSMountWatcher.save_mount_details_to_modified_mounts_file(modified_mount_details, current_mount_status)
NFSMountWatcher.move_mount_statuses_from_tmp_file()
return mount_points
@staticmethod
def move_mount_statuses_from_tmp_file():
modified_mounts_file = NFSMountWatcher.get_modified_mounts_file_path()
tmp_modified_mounts_file = NFSMountWatcher.get_modified_mounts_file_path(use_tmp_file=True)
if not os.path.exists(tmp_modified_mounts_file):
os.mknod(tmp_modified_mounts_file)
logging.info(format_message('Finalizing modified mounts from temp file'))
shutil.move(tmp_modified_mounts_file, modified_mounts_file)
@staticmethod
def _process_modified_mount(current_mount_status, new_mount_status, modified_mount_details, mount_points):
if current_mount_status == MOUNT_STATUS_DISABLED:
NFSMountWatcher.process_currently_disabled_mount(modified_mount_details, mount_points, new_mount_status)
elif current_mount_status == MOUNT_STATUS_READ_ONLY:
NFSMountWatcher.process_currently_ro_mount(modified_mount_details, mount_points, new_mount_status)
elif current_mount_status == MOUNT_STATUS_ACTIVE:
NFSMountWatcher.process_currently_active_mount(modified_mount_details, mount_points, new_mount_status)
else:
logging.warning(format_message('Unknown mount status [{}]'.format(new_mount_status)))
NFSMountWatcher.save_mount_details_to_modified_mounts_file(modified_mount_details, current_mount_status)
@staticmethod
def process_currently_ro_mount(modified_mount_details, mount_points, new_mount_status):
if new_mount_status == MOUNT_STATUS_DISABLED:
if NFSMountWatcher.try_to_unmount(modified_mount_details):
NFSMountWatcher.save_details_status_disabled(modified_mount_details)
else:
NFSMountWatcher.save_details_status_readonly(modified_mount_details)
elif new_mount_status == MOUNT_STATUS_ACTIVE:
if NFSMountWatcher._try_remount_storage(modified_mount_details, False):
NFSMountWatcher.save_details_status_active(modified_mount_details)
mount_points[modified_mount_details.mount_point] = modified_mount_details.mount_source
else:
NFSMountWatcher.save_details_status_readonly(modified_mount_details)
else:
logging.warning(format_message('Unknown mount status [{}]'.format(new_mount_status)))
NFSMountWatcher.save_details_status_readonly(modified_mount_details)
@staticmethod
def process_currently_disabled_mount(modified_mount_details, mount_points, new_mount_status):
if new_mount_status == MOUNT_STATUS_READ_ONLY:
if NFSMountWatcher._try_mount_storage(modified_mount_details, True):
NFSMountWatcher.save_details_status_readonly(modified_mount_details)
else:
NFSMountWatcher.save_details_status_disabled(modified_mount_details)
elif new_mount_status == MOUNT_STATUS_ACTIVE:
if NFSMountWatcher._try_mount_storage(modified_mount_details, False):
NFSMountWatcher.save_details_status_active(modified_mount_details)
mount_points[modified_mount_details.mount_point] = modified_mount_details.mount_source
else:
NFSMountWatcher.save_details_status_disabled(modified_mount_details)
else:
logging.warning(format_message('Unknown mount status [{}]'.format(new_mount_status)))
NFSMountWatcher.save_details_status_disabled(modified_mount_details)
@staticmethod
def process_currently_active_mount(mount_details, mount_points, new_mount_status):
if new_mount_status == MOUNT_STATUS_DISABLED:
if NFSMountWatcher.try_to_unmount(mount_details):
NFSMountWatcher.save_details_status_disabled(mount_details, use_tmp_file=False)
return
elif new_mount_status == MOUNT_STATUS_READ_ONLY:
if NFSMountWatcher._try_remount_storage(mount_details, True):
NFSMountWatcher.save_details_status_readonly(mount_details, use_tmp_file=False)
return
elif new_mount_status != MOUNT_STATUS_ACTIVE:
logging.info(format_message('Received unknown status [{}] for [{}]'.format(new_mount_status,
mount_details.mount_point)))
mount_points[mount_details.mount_point] = mount_details.mount_source
@staticmethod
def save_details_status_disabled(mount_details, use_tmp_file=True):
NFSMountWatcher.save_mount_details_to_modified_mounts_file(mount_details, MOUNT_STATUS_DISABLED,
use_tmp_file=use_tmp_file)
@staticmethod
def save_details_status_readonly(modified_mount_details, use_tmp_file=True):
NFSMountWatcher.save_mount_details_to_modified_mounts_file(modified_mount_details, MOUNT_STATUS_READ_ONLY,
use_tmp_file=use_tmp_file)
@staticmethod
def save_details_status_active(mount_details, use_tmp_file=True):
NFSMountWatcher.save_mount_details_to_modified_mounts_file(mount_details, MOUNT_STATUS_ACTIVE,
use_tmp_file=use_tmp_file)
@staticmethod
def save_mount_details_to_modified_mounts_file(mount_details, status, use_tmp_file=True):
modified_mounts_file = NFSMountWatcher.get_modified_mounts_file_path(use_tmp_file=use_tmp_file)
logging.info(format_message('Saving status of [{}]: [{}]'.format(mount_details.mount_point, status)))
with open(modified_mounts_file, "a") as modified_mounts:
original_mount_summary = MODIFIED_MNT_SEPARATOR.join([mount_details.mount_source,
mount_details.mount_point,
mount_details.mount_type,
mount_details.mount_attributes,
status])
modified_mounts.write(NEWLINE + original_mount_summary)
@staticmethod
def _try_remount_storage(mount_details, read_only):
return NFSMountWatcher._execute_mount_command(mount_details, read_only, True)
@staticmethod
def _try_mount_storage(mount_details, read_only):
return NFSMountWatcher._execute_mount_command(mount_details, read_only, False)
@staticmethod
def _execute_mount_command(mount_details, read_only, add_remount_flag):
mount_options = mount_details.mount_attributes.split(COMMA)
if read_only:
mount_options = [READ_ONLY_OPTION if option == READ_WRITE_OPTION else option for option in mount_options]
else:
mount_options = [READ_WRITE_OPTION if option == READ_ONLY_OPTION else option for option in mount_options]
if add_remount_flag:
mount_options.append(REMOUNT_OPTION)
mount_options = COMMA.join(mount_options)
mkdir(mount_details.mount_point)
mount_command = NFS_MOUNT_CMD_PATTERN.format(
mount_details.mount_type, mount_options, mount_details.mount_source, mount_details.mount_point)
logging.info(format_message('Trying to mount [{}] to [{}] in RO=[{}] mode'.format(mount_details.mount_source,
mount_details.mount_point,
read_only)))
out, res = execute_command(mount_command)
if not res:
logging.warning(
format_message('Unable to mount [{}] to [{}] in RO=[{}] mode: {}'.format(mount_details.mount_source,
mount_details.mount_point,
read_only,
out)))
return res
@staticmethod
def try_to_unmount(mount_details):
logging.info(format_message('Trying to unmount [{}] from [{}]'.format(mount_details.mount_source,
mount_details.mount_point)))
out, res = execute_command(NFS_UNMOUNT_CMD_PATTERN.format(mount_details.mount_point))
if not res:
logging.warning(format_message('Unable to umount [{}]: {}'.format(mount_details.mount_point, out)))
return res
@staticmethod
def get_modified_mounts_file_path(use_tmp_file=False):
path = os.path.join(CloudBucketDumpingEventHandler.configure_logging_local_dir(), 'modified_mounts')
if use_tmp_file:
path += '-tmp'
return path
@staticmethod
def _find_matching_storage(available_storages_dict, mount_details):
matching_storage = None
if mount_details.mount_type == 'lustre':
for path, storage in available_storages_dict.items():
lustre_path_chunks = path.split(LNET_SPLIT, 1)
if len(lustre_path_chunks) == 2:
lustre_host_ip = socket.gethostbyname(lustre_path_chunks[0])
lustre_target_source = lustre_host_ip + LNET_SPLIT + lustre_path_chunks[1]
if lustre_target_source == mount_details.mount_source:
matching_storage = storage
break
else:
matching_storage = available_storages_dict.get(mount_details.mount_source)
return matching_storage
def _get_mnt_resync_timeout(self):
if MNT_RESYNC_TIMEOUT_SEC > EVENT_DUMPING_TIMEOUT_SEC:
logging.warning(
format_message('Mount scanning timeout [{}] is greater, than dumping timeout [{}]. Using the second one'
.format(MNT_RESYNC_TIMEOUT_SEC, EVENT_DUMPING_TIMEOUT_SEC)))
resync_timeout = EVENT_DUMPING_TIMEOUT_SEC
else:
resync_timeout = MNT_RESYNC_TIMEOUT_SEC
return resync_timeout
def try_increase_watchers_limit(self):
new_limit = 2 * self._watchers_limit
max_allowed_watchers = self._calculate_max_allowed_watchers()
if new_limit > max_allowed_watchers:
logging.warning(format_message('Unable to increase limit up to [{}], maximum allowed is [{}]'
.format(new_limit, max_allowed_watchers)))
return
logging.warning(format_message('Updating watchers limit in `{}` with [{}]'
.format(NFSMountWatcher.WATCHER_LIMIT_FILE, new_limit)))
with open(NFSMountWatcher.WATCHER_LIMIT_FILE, "w") as out:
out.write(str(new_limit))
def release_resources(self):
self._event_observer.stop()
if len(self._target_path_mapping.items()) > 1:
self._event_observer.unschedule_all()
self._event_handler.dump_to_storage()
def start(self):
logging.info(format_message('Start monitoring shares state...'))
self._event_observer.start()
resync_timeout = self._get_mnt_resync_timeout()
while True:
time.sleep(resync_timeout)
if self._target_paths:
self._update_static_paths_mapping()
else:
self._update_target_mount_points()
self._event_handler.check_timeout_and_dump()
if __name__ == '__main__':
while True:
watcher = NFSMountWatcher(target_paths=os.getenv('CP_CAP_NFS_OBSERVER_TARGET_PATHS'))
try:
watcher.start()
except BaseException as e:
if watcher.is_terminated():
logging.warning(format_message('Termination signal received, exiting.'))
exit(0)
error_message = str(e)
logging.error(format_message('An exception occurred in the observer: {}'.format(error_message)))
if 'inotify' in error_message:
logging.error(format_message('Error is considered related to inotify, trying to increase limits...'))
watcher.try_increase_watchers_limit()
logging.error(format_message('Restarting the observer...'))
finally:
watcher.release_resources()