docker/services/algorithm_service.py (113 lines of code) (raw):
from datetime import datetime
from typing import Type
from bson import ObjectId
from bson.errors import InvalidId
from mongoengine import DoesNotExist, ValidationError, EmbeddedDocument
from commons.constants import CLUSTERING_SETTINGS_ATTR, METRIC_FORMAT_ATTR, \
RECOMMENDATION_SETTINGS_ATTR, ALGORITHM_ATTR
from commons.log_helper import get_logger
from models.algorithm import Algorithm, RecommendationSettings, \
ClusteringSettings, MetricFormatSettings
_LOG = get_logger('r8s-algorithm-service')
class AlgorithmService:
@staticmethod
def list():
return list(Algorithm.objects.all())
def get(self, identifier: str):
_LOG.debug(f'Describing algorithm by identifier: \'{identifier}\'')
try:
_LOG.debug(f'Trying to convert to bson id')
ObjectId(identifier)
_LOG.debug(f'Describing algorithm by id')
return self.get_by_id(object_id=identifier)
except InvalidId:
_LOG.debug(f'Describing algorithm by name')
return self.get_by_name(name=identifier)
@staticmethod
def get_by_id(object_id):
try:
return Algorithm.objects.get(id=object_id)
except (DoesNotExist, ValidationError):
return None
@staticmethod
def get_by_name(name: str):
try:
return Algorithm.objects.get(name=name)
except (DoesNotExist, ValidationError):
return None
@staticmethod
def create(algorithm_data: dict):
return Algorithm(**algorithm_data)
@staticmethod
def save(algorithm: Algorithm):
algorithm.last_modified = datetime.utcnow()
algorithm.md5 = algorithm.get_checksum()
algorithm.save()
@staticmethod
def delete(algorithm: Algorithm):
algorithm.delete()
def update_from_licensed_job(self, algorithm: Algorithm,
licensed_job: dict):
licensed_algorithm_data = licensed_job.get(ALGORITHM_ATTR)
cur_clust_settings = dict(algorithm.clustering_settings.to_mongo())
cur_rec_settings = dict(algorithm.recommendation_settings.to_mongo())
clustering_settings = licensed_algorithm_data.get(
CLUSTERING_SETTINGS_ATTR)
recommendation_settings = licensed_algorithm_data.get(
RECOMMENDATION_SETTINGS_ATTR)
if cur_clust_settings != clustering_settings:
_LOG.warning(f'Updating algorithm \'{algorithm.name}\' '
f'clustering settings.')
self.update_clustering_settings(
algorithm=algorithm,
clustering_settings=clustering_settings
)
if cur_rec_settings != recommendation_settings:
_LOG.warning(f'Updating algorithm \'{algorithm.name}\' '
f'recommendation settings.')
self.update_recommendation_settings(
algorithm=algorithm,
recommendation_settings=recommendation_settings
)
def update_clustering_settings(self, algorithm: Algorithm,
clustering_settings: dict):
return self._update_embedded_document(
algorithm=algorithm,
attr_name=CLUSTERING_SETTINGS_ATTR,
value=clustering_settings,
document_class=ClusteringSettings
)
def update_metric_format_settings(self, algorithm: Algorithm,
metric_format_settings: dict):
return self._update_embedded_document(
algorithm=algorithm,
attr_name=METRIC_FORMAT_ATTR,
value=metric_format_settings,
document_class=MetricFormatSettings
)
def update_recommendation_settings(self, algorithm: Algorithm,
recommendation_settings: dict):
return self._update_embedded_document(
algorithm=algorithm,
attr_name=RECOMMENDATION_SETTINGS_ATTR,
value=recommendation_settings,
document_class=RecommendationSettings
)
@staticmethod
def _update_embedded_document(algorithm: Algorithm,
document_class: Type[EmbeddedDocument],
attr_name: str, value: dict):
try:
document = algorithm.__getattribute__(attr_name)
except AttributeError:
_LOG.error(f'Attribute \'{attr_name}\' does not exist '
f'in algorithm model.')
return
if document:
document_dict = dict(document.to_mongo())
else:
document_dict = {}
document_dict.update(value)
document = document_class(**document_dict)
algorithm.__setattr__(attr_name, document)
return algorithm