patches/4.0.0/main.py (134 lines of code) (raw):
from ddtrace import tracer
from unittest.mock import MagicMock
tracer.configure(writer=MagicMock())
import argparse
import json
import logging
import os
import sys
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from modular_api.helpers.constants import Env, ServiceMode
from modular_api.helpers.password_util import secure_string
from modular_api.models.audit_model import Audit
from modular_api.models.group_model import Group
from modular_api.models.policy_model import Policy
from modular_api.models.stats_model import Stats
from modular_api.models.user_model import User
from modular_api.services import SP
class DBType(str, Enum):
USERS = 'ModularUser'
STATS = 'ModularStats'
POLICIES = 'ModularPolicy'
GROUPS = 'ModularGroup'
AUDIT = 'ModularAudit'
LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s'
def build_logger(name: str) -> logging.Logger:
log = logging.getLogger(name)
log.setLevel(logging.DEBUG)
h = logging.StreamHandler()
h.setFormatter(logging.Formatter(LOG_FORMAT))
log.addHandler(h)
return log
_LOG = build_logger(__name__)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='Patch tinydb to mongodb')
parser.add_argument('--tinydb-root', type=Path,
default=str((Path.home() / '.modular_api/databases').resolve()),
help='Path where tinydb is (default: %(default)s)')
parser.add_argument('--mongo-uri', type=str, required=True,
help='MongoDB full uri (mongodb://use...)')
parser.add_argument('--mongo-database', type=str, required=True,
help='MongoDB database name')
parser.add_argument('--secret-key', type=str, required=True,
help='Secret key that is used for service')
parser.add_argument('--logging-filename', type=Path, required=False,
help='Filename to write logs to')
return parser
def convert_user(item: dict) -> User:
item['creation_date'] = datetime.fromtimestamp(item['creation_date'],
tz=timezone.utc).isoformat()
user = User(**item)
user.hash = SP.user_service.calculate_user_hash(user)
return user
def convert_audit(item: dict) -> Audit:
item['timestamp'] = datetime.fromtimestamp(item['timestamp'],
tz=timezone.utc).isoformat()
item.pop('hash', None)
audit = Audit(**item)
event_to_be_hashed = SP.audit_service.prepare_audit_to_be_hashed(audit=audit)
event_to_be_hashed = json.dumps(event_to_be_hashed, sort_keys=True)
audit.hash_sum = secure_string(event_to_be_hashed)
return audit
def convert_group(item: dict) -> Group:
item['creation_date'] = datetime.fromtimestamp(item['creation_date'],
tz=timezone.utc).isoformat()
gr = Group(**item)
gr.hash = SP.group_service.calculate_group_hash(gr)
return gr
def convert_policy(item: dict) -> Policy:
item['creation_date'] = datetime.fromtimestamp(item['creation_date'],
tz=timezone.utc).isoformat()
item['policy_content'] = json.dumps(item['policy_content'],
separators=(',', ':'))
pol = Policy(**item)
pol.hash = SP.policy_service.calculate_policy_hash(pol)
return pol
def convert_stat(item: dict) -> Stats:
item['type'] = 'CHAIN'
return Stats(**item)
def patch_collection(data: dict, db_type: DBType) -> None:
match db_type:
case DBType.USERS:
convertor = convert_user
case DBType.POLICIES:
convertor = convert_policy
case DBType.AUDIT:
convertor = convert_audit
case DBType.STATS:
convertor = convert_stat
case DBType.GROUPS:
convertor = convert_group
case _:
def convertor(x): return x
_LOG.info('Converting items and saving to db')
for item in (data.get('_default') or {}).values():
convertor(item).save()
def patch(tinydb_root: Path, mongo_uri: str, mongo_database: str,
secret_key: str, logging_filename: Path | None = None) -> None:
if logging_filename:
h = logging.FileHandler(logging_filename)
h.setFormatter(logging.Formatter(LOG_FORMAT))
_LOG.addHandler(h)
os.environ[Env.MONGO_DATABASE.value] = mongo_database
os.environ[Env.MONGO_URI.value] = mongo_uri
os.environ[Env.SECRET_KEY.value] = secret_key
os.environ[Env.MODE.value] = ServiceMode.ONPREM.value
if not tinydb_root.is_dir() or not tinydb_root.exists():
_LOG.error(f'{tinydb_root} must be a directory that exists')
sys.exit(1)
for filename in tinydb_root.iterdir():
try:
db_type = DBType(filename.stem)
except ValueError:
_LOG.warning(f'Not known db name: {filename.stem}')
continue
_LOG.info(f'Going to patch {filename.stem}')
try:
with open(filename, 'r') as fp:
data = json.load(fp)
except json.JSONDecodeError:
_LOG.warning('Invalid JSON inside. Skipping')
continue
try:
patch_collection(data, db_type)
except Exception:
_LOG.exception('Unexpected exception occurred. Ending patch')
sys.exit(1)
_LOG.info('Collection was patched')
def main():
arguments = build_parser().parse_args()
patch(**vars(arguments))
if __name__ == '__main__':
main()