modular_api/services/usage_service.py (196 lines of code) (raw):
import hashlib
import json
import os
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from pathlib import Path
from pynamodb.pagination import ResultIterator
from bottle import LocalRequest
from modular_api.helpers.constants import (
MODULES_DIR, API_MODULE_FILE, M_POINT, COMMAND,
GROUP, DATE, META, STATUS, EVENT_TYPE, EVENT_TYPE_API, PRODUCT,
JOB_ID, DATE_FORMAT, MODULE_NAME)
from modular_api.helpers.log_helper import get_logger
from modular_api.models.stats_model import Stats
_LOG = get_logger(__name__)
class AbstractUsageService(ABC):
@abstractmethod
def save_stats(self, request: LocalRequest, response: dict) -> None:
"""
Method for statistic saving. Item to be saved template:
api_stats_item = {
"date": str,
"mount_point": str,
"group": str,
"command": str,
"meta": dict,
"status": str,
"event_type": str,
"product": str,
"timestamp": int,
"id": str
}
async_job_stats_item = {
"date": str,
"mount_point": str,
"meta": dict,
"status": str,
"event_type": str,
"product": str,
"job_id": str,
"timestamp": int,
"id": str
}
Pay attention: key order in dict must not be changed.
"""
pass
@abstractmethod
def get_stats(self, module: str, from_date: int, to_date: int) -> list:
"""
Method for statistic retrieving.
"""
pass
class UsageService(AbstractUsageService):
def __init__(self):
self.modules_info = self.__get_installed_modules_info()
self.last_rec_ts = None
self.last_rec_date = None
@staticmethod
def _get_two_last() -> tuple[Stats | None, Stats | None]:
items = tuple(Stats.type_timestamp_index.query(
hash_key='CHAIN',
scan_index_forward=False,
limit=2
))
match len(items):
case 2:
return items
case 1:
return items[0], None
case _:
return None, None
@staticmethod
def _get_last() -> Stats | None:
return next(Stats.type_timestamp_index.query(
hash_key='CHAIN',
scan_index_forward=False,
limit=1
), None)
# class entity initialization helpers =====================================
def __resolve_module_mount_point(self, request: LocalRequest) -> str:
"""
Resolve mount point of module from the request. Obtained mount point is
equal to mount points in Modular-API meta description.
"""
raw_path = request.urlparts.path
idx_of_second_slash = raw_path.find('/', 1)
mount_point = raw_path[:idx_of_second_slash]
mount_point_list = self.modules_info.keys()
if mount_point not in mount_point_list:
return '/'
return mount_point
@staticmethod
def __start_month_ts(date: str, next_month=False) -> int:
date_time = datetime.strptime(date, DATE_FORMAT).astimezone(
timezone.utc)
if not next_month:
current_month_start = date_time.replace(day=1, hour=0, minute=0,
second=0, microsecond=0)
return int(current_month_start.timestamp())
if date_time.month == 12:
next_month_start = date_time.replace(year=date_time.year + 1,
month=1, day=1, hour=0,
minute=0, second=0,
microsecond=0)
else:
next_month_start = date_time.replace(month=date_time.month + 1,
day=1, hour=0, minute=0,
second=0, microsecond=0)
return int(next_month_start.timestamp())
def __make_previous_item_id(self, date, timestamp):
record, prev_record = self._get_two_last()
record_ts = record.timestamp
start_id = (self.__start_month_ts(date) + record_ts +
self.__start_month_ts(date, next_month=True))
hex_hash = hashlib.md5(
json.dumps(start_id, sort_keys=True).encode('utf-8')).hexdigest()
if record.id == hex_hash:
item_id = self.__start_month_ts(date) + record_ts + timestamp
else:
item_id = prev_record.timestamp + record_ts + timestamp
item_id_hash = hashlib.md5(
json.dumps(item_id, sort_keys=True).encode('utf-8')).hexdigest()
return item_id_hash
@staticmethod
def __get_installed_modules_info() -> dict:
"""
Return all Modular-API`s installed modules info
installed_modules_info_model = {
mount_point: module_name
}
"""
modules_path = Path(__file__).parent.parent / MODULES_DIR
if not modules_path.exists():
os.makedirs(modules_path)
installed_modules_info = dict()
for module in modules_path.iterdir():
api_file_path = module / API_MODULE_FILE
if not module.is_dir() or not api_file_path.exists():
continue
with open(api_file_path, 'r') as file:
module_descriptor = json.load(file)
mount_point = module_descriptor.get(M_POINT)
module_name = module_descriptor.get(MODULE_NAME)
installed_modules_info.update({mount_point: module_name})
return installed_modules_info
# make stats item =========================================================
def __make_stats_item(self, mount_point, group, command, meta, status,
event_type, product, job_id):
last = self._get_last()
if last:
self.last_rec_ts = last.timestamp
self.last_rec_date = last.date
utc_time_now = datetime.now(timezone.utc)
ts = int(utc_time_now.timestamp()) * 1000
date = utc_time_now.strftime(DATE_FORMAT)
prev_item_id = None
stats_doc = {
DATE: date,
M_POINT: mount_point,
GROUP: group,
COMMAND: command,
META: meta,
STATUS: status,
EVENT_TYPE: event_type,
PRODUCT: product
}
if event_type != EVENT_TYPE_API:
for key in [GROUP, COMMAND]:
stats_doc.pop(key)
stats_doc.update({JOB_ID: job_id})
hex_hash = hashlib.md5(
json.dumps(stats_doc, sort_keys=True).encode('utf-8')).hexdigest()
current_ts = ts + (int(hex_hash, 16) % 1000)
if current_ts == self.last_rec_ts:
current_ts += 2
stats_doc.update(timestamp=current_ts)
if (not self.last_rec_date or
(self.__start_month_ts(date) != self.__start_month_ts(
self.last_rec_date))):
item_id = (self.__start_month_ts(date) + current_ts +
self.__start_month_ts(date, next_month=True))
else:
item_id = (self.last_rec_ts + current_ts +
self.__start_month_ts(date, next_month=True))
prev_item_id = self.__make_previous_item_id(date=date,
timestamp=current_ts)
hex_hash = hashlib.md5(
json.dumps(item_id, sort_keys=True).encode('utf-8')).hexdigest()
stats_doc.update(id=hex_hash)
return stats_doc, current_ts, date, prev_item_id
# main methods ============================================================
def save_stats(self, request: LocalRequest, payload: dict) -> None:
module_mount_point = self.__resolve_module_mount_point(request)
parts = request.path.strip('/').split('/')
if len(parts) >= 3:
*_, group_name, command_name = parts
elif len(parts) == 2:
group_name, command_name = None, parts[1]
else:
group_name, command_name = None, None
exec_status = payload.get(STATUS)
meta = payload.get(META)
event_type = payload.get(EVENT_TYPE)
product = payload.get(PRODUCT)
job_id = payload.get(JOB_ID)
status = 'FAILED'
if exec_status:
status = 'SUCCESS' if exec_status.lower() == 'success' or \
exec_status.lower() == 'succeeded' else 'FAILED'
if not product:
event_type = EVENT_TYPE_API
product = self.modules_info.get(module_mount_point)
stats_doc, current_event_ts, current_event_date, prev_item_id = (
self.__make_stats_item(module_mount_point, group_name,
command_name, meta, status, event_type,
product, job_id))
if prev_item_id:
_LOG.info(f'Updating previous item. Item ID: {prev_item_id}')
last = self._get_last()
last.delete()
last.id = prev_item_id
last.save()
_LOG.info(f'Saving usage event. Last timestamp: {self.last_rec_ts},'
f'current timestamp: {current_event_ts}')
Stats(**stats_doc).save()
_LOG.info(f'Usage event saved')
def get_stats(self, module: str, from_date: int | None = None,
to_date: int | None = None) -> ResultIterator[Stats]:
rkc = None
if from_date and to_date:
rkc &= Stats.timestamp.between(from_date, to_date)
elif from_date:
rkc &= (Stats.timestamp >= from_date)
elif to_date:
rkc &= (Stats.timestamp < to_date)
return Stats.mount_point_timestamp_index.query(
hash_key=module,
range_key_condition=rkc
)