modular.py (167 lines of code) (raw):
#!/usr/local/bin/python
import json
import os
import sys
from typing import TYPE_CHECKING, cast, Generator
import pymongo
from pymongo.operations import IndexModel
if TYPE_CHECKING:
from modular_api.models import BaseModel
from modular_api_cli.modular_cli_group.modular import modular
from modular_api.helpers.log_helper import get_logger
_LOG = get_logger('init')
class IndexesCreator:
main_index_name = 'main'
hash_key_order = pymongo.ASCENDING
range_key_order = pymongo.DESCENDING
def __init__(self, models: tuple['BaseModel', ...]):
self._models = models
@staticmethod
def _get_hash_range(model: 'BaseModel') -> tuple[str, str | None]:
h, r = None, None
for attr in model.get_attributes().values():
if attr.is_hash_key:
h = attr.attr_name
if attr.is_range_key:
r = attr.attr_name
return cast(str, h), r
@staticmethod
def _iter_indexes(model: 'BaseModel'
) -> Generator[tuple[str, str, str | None], None, None]:
"""
Yields tuples: (index name, hash_key, range_key) indexes of the given
model. Currently, only global secondary indexes are used so this
implementation wasn't tested with local ones. Uses private PynamoDB
API because cannot find public methods that can help
"""
for index in model._indexes.values():
name = index.Meta.index_name
h, r = None, None
for attr in index.Meta.attributes.values():
if attr.is_hash_key:
h = attr.attr_name
if attr.is_range_key:
r = attr.attr_name
yield name, cast(str, h), r
def _iter_all_indexes(self, model: 'BaseModel'
) -> Generator[tuple[str, str, str | None], None, None]:
yield self.main_index_name, *self._get_hash_range(model)
yield from self._iter_indexes(model)
@staticmethod
def _exceptional_indexes() -> tuple[str, ...]:
return (
'_id_',
)
def _ensure_indexes(self, model: 'BaseModel'):
table_name = model.Meta.table_name
_LOG.info(f'Going to check indexes for {table_name}')
collection = model.mongodb_handler().mongodb.collection(table_name)
existing = collection.index_information()
for name in self._exceptional_indexes():
existing.pop(name, None)
needed = {}
for name, h, r in self._iter_all_indexes(model):
needed[name] = [(h, self.hash_key_order)]
if r:
needed[name].append((r, self.range_key_order))
to_create = []
to_delete = set()
for name, data in existing.items():
if name not in needed:
to_delete.add(name)
continue
# name in needed so maybe the index is valid, and we must keep it
# or the index has changed, and we need to re-create it
if data.get('key', []) != needed[name]: # not valid
to_delete.add(name)
to_create.append(IndexModel(
keys=needed[name],
name=name
))
needed.pop(name)
for name, keys in needed.items(): # all that left must be created
to_create.append(IndexModel(
keys=keys,
name=name
))
for name in to_delete:
_LOG.info(f'Going to remove index: {name}')
collection.drop_index(name)
if to_create:
_message = ','.join(
json.dumps(i.document,
separators=(',', ':')) for i in to_create
)
_LOG.info(f'Going to create indexes: {_message}')
collection.create_indexes(to_create)
def create(self):
_LOG.debug('Going to sync indexes with code')
for model in self._models:
self._ensure_indexes(model)
def init():
"""
Separate init for docker container. This method just skips if necessary
items already exist. So, 1 exit code will be only if something really fails
:return:
"""
# TODO maybe some to click
from modular_api.helpers.password_util import generate_password, secure_string
from modular_api.services import SERVICE_PROVIDER as SP
policy = SP.policy_service.describe_policy('admin_policy')
if not policy:
_LOG.info('admin policy does not exist. Creating')
policy = SP.policy_service.create_policy_entity(
policy_name='admin_policy',
policy_content=[{
"Description": "Admin policy",
"Module": "*",
"Effect": "Allow",
"Resources": ["*"]
}]
)
policy.hash = SP.policy_service.calculate_policy_hash(
policy_item=policy
)
SP.policy_service.save_policy(policy_item=policy)
else:
_LOG.info('admin policy already exists. Skipping')
group = SP.group_service.describe_group('admin_group')
if not group:
_LOG.info('admin group does not exist. Creating')
group = SP.group_service.create_group_entity(
group_name='admin_group',
policies=['admin_policy']
)
group.hash = SP.group_service.calculate_group_hash(
group_item=group
)
SP.group_service.save_group(group_item=group)
else:
_LOG.info('admin group already exists. Skipping')
user = SP.user_service.describe_user('admin')
if not user:
_LOG.info('admin user does not exist. Creating')
user_password = os.getenv('MODULAR_API_INIT_PASSWORD')
if user_password:
is_autogenerated = False
else:
user_password = generate_password()
is_autogenerated = True
user = SP.user_service.create_user_entity(
username='admin',
password=secure_string(user_password),
group=['admin_group']
)
user.hash = SP.user_service.calculate_user_hash(user)
SP.user_service.save_user(user_item=user)
if is_autogenerated:
print(f'Autogenerated password: {user_password}')
else:
_LOG.info('admin user already exists. Skipping')
_LOG.debug('Initialization finished')
if __name__ == '__main__':
# TODO move to modular click or make normal CLI with argparse
if len(sys.argv) == 2 and sys.argv[1] == 'init':
try:
init()
except Exception:
sys.exit(1)
elif len(sys.argv) == 2 and sys.argv[1] == 'create-indexes':
from modular_api.models.audit_model import Audit
from modular_api.models.group_model import Group
from modular_api.models.policy_model import Policy
from modular_api.models.stats_model import Stats
from modular_api.models.user_model import User
from modular_api.models.refresh_token_model import RefreshToken
try:
IndexesCreator((
Audit, Group, Policy, Stats, User, RefreshToken
)).create()
except Exception:
sys.exit(1)
else:
modular()