modular_sdk/services/settings_management_service.py (221 lines of code) (raw):
import json
import os
import re
from modular_sdk.commons import ModularException
from modular_sdk.commons.error_helper import RESPONSE_BAD_REQUEST_CODE
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.models.setting import Setting
_LOG = get_logger(__name__)
class SettingsManagementService:
def __init__(self, group_name: str):
self.group_name = group_name.upper()
def create_setting_item(self, setting_key, setting_value):
if isinstance(setting_key, str):
setting_key = setting_key.upper()
self._validate_setting_key(setting_key=setting_key)
setting_value = self._validate_setting_value(
setting_key=setting_key,
setting_value=setting_value
)
if self.describe_setting_item(setting_key=setting_key):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'{setting_key} setting already exists. Please use '
f'\'update\' command instead.'
)
setting_item = Setting(name=setting_key,
value=setting_value)
setting_item.save()
def update_setting_item(self, setting_key, setting_value):
if isinstance(setting_key, str):
setting_key = setting_key.upper()
self._validate_setting_key(setting_key=setting_key)
setting_value = self._validate_setting_value(
setting_key=setting_key,
setting_value=setting_value
)
if not self.describe_setting_item(setting_key=setting_key):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'{setting_key} setting does not exist. Please use '
f'\'add\' command first.'
)
setting_item = Setting(name=setting_key,
value=setting_value)
setting_item.save()
def describe_setting_item(self, setting_key):
if isinstance(setting_key, str):
setting_key = setting_key.upper()
self._validate_setting_key(setting_key=setting_key)
return Setting.get_nullable(setting_key)
def list_setting_items(self):
return list(Setting.scan(
filter_condition=(Setting.name.startswith(self.group_name))
))
def delete_setting_item(self, setting_key):
if isinstance(setting_key, str):
setting_key = setting_key.upper()
self._validate_setting_key(setting_key)
if not Setting.get_nullable(setting_key):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'{setting_key} setting not found.'
)
Setting(name=setting_key).delete()
def _validate_setting_key(self, setting_key: str):
regex = r"^{0}(?![\da-zA-Z])".format(self.group_name)
if not re.match(regex, setting_key):
_LOG.error('Invalid setting name')
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'{setting_key} setting does not belongs to '
f'{self.group_name} group'
)
def _validate_setting_value(self, setting_key: str, setting_value: str):
# bool validation
boolean_markers = ['_ENABLED', '_DISABLED', '_ACTIVE',
'_RUNNING', '_AVAILABLE'] # + '_IS_', '_ARE_'
for mark in boolean_markers:
if setting_key.endswith(mark) or '_IS_' in setting_key:
return self.__process_boolean_value(setting_value)
if '_ARE_' in setting_key:
return self.__process_boolean_value(setting_value)
# list validation
if setting_key.endswith('_LIST'):
return self.__process_list_value(setting_value)
# map validation
map_markers = ['_JSON', '_MAP', '_MAPPING']
for mark in map_markers:
if setting_key.endswith(mark):
return self.__process_map_value(setting_value)
# integer validation
int_markers = ['_COUNT', '_THRESHOLD', '_LONG', '_INT']
for mark in int_markers:
if setting_key.endswith(mark):
return self.__process_integer_value(setting_value)
if setting_key.endswith('_EXPIRATION'):
return self.__process_milliseconds_value(setting_value)
# email validation
if setting_key.endswith('_MAIL'):
return self.__process_email_value(setting_value)
# url validation
if setting_key.endswith('_URL'):
return self.__process_url_value(setting_value)
# for all other cases
return self.__process_regular_value(setting_value)
@staticmethod
def __process_boolean_value(setting_value):
if setting_value.lower() not in ('true', 'false'):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Setting name matches to the one from the following '
f'patterns: *_IS_* / *_ARE_* / *_ENABLED / *_DISABLED / '
f'*_ACTIVE / *_RUNNING / *_AVAILABLE{os.linesep}'
f'Invalid value provided for bool type setting. '
f'Expected "True" or "False", case-insensitive. '
)
setting_value = True if setting_value.lower() == 'true' else False
return setting_value
@staticmethod
def __process_email_value(setting_value):
import re
if not re.match(
r'(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)',
setting_value
):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Setting name matches to the pattern: *_MAIL{os.linesep}'
f'Invalid email provided, check spelling.'
)
return setting_value
@staticmethod
def __process_milliseconds_value(setting_value):
invalid_length = False
if len(setting_value) != 13:
invalid_length = True
try:
import datetime
if invalid_length:
raise ValueError()
if setting_value.startswith('0'):
raise ValueError()
timestamp_in_seconds = int(setting_value) / 1000
if timestamp_in_seconds < 0:
raise ValueError()
datetime.datetime.fromtimestamp(timestamp_in_seconds)
return int(setting_value)
except (TypeError, ValueError, OSError):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Setting name matches to the pattern: *_EXPIRATION{os.linesep}'
f'Invalid value provided for expiration setting. '
f'Expected EPOCH milliseconds format, 13 digits, '
f'positive integer number'
)
@staticmethod
def __process_url_value(setting_value):
try:
from urllib.parse import urlparse
result = urlparse(setting_value)
if not all([result.scheme, result.netloc]):
raise AssertionError()
return setting_value
except AssertionError:
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Setting name matches to the pattern: *_URL{os.linesep}'
f'Invalid value provided for URL link, check spelling. '
f'Format: <$protocol>://<$net_location>'
)
@staticmethod
def __process_integer_value(setting_value):
try:
setting_value = int(setting_value)
return setting_value
except (TypeError, ValueError):
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Setting name matches to the one from the following '
f'patterns: *_COUNT, *_THRESHOLD, *_LONG, *_INT{os.linesep}'
f'Invalid value provided. Expected integer value.'
)
@staticmethod
def __process_map_value(setting_value):
if isinstance(setting_value, dict):
return setting_value
try:
result = json.loads(setting_value)
except json.JSONDecodeError:
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Setting name matches to the one from the following '
f'patterns: *_JSON / *_MAP / *_MAPPING{os.linesep}'
f'Value format is \'Escaped string\'.{os.linesep}'
f'Invalid JSON object provided.'
)
return result
@staticmethod
def __process_list_value(setting_value):
if isinstance(setting_value, list):
return setting_value
result_value = []
setting_value_list = setting_value.split(',')
for item in setting_value_list:
value = item.strip()
try:
result_value.append(int(value))
continue
except ValueError:
pass
try:
result_value.append(float(value))
continue
except ValueError:
pass
result_value.append(value)
return result_value
@staticmethod
def __process_regular_value(setting_value):
try:
return int(setting_value)
except ValueError:
pass
try:
return float(setting_value)
except ValueError:
pass
return setting_value