src/main.py (602 lines of code) (raw):
#!/usr/local/bin/python
import argparse
import base64
import json
import logging.config
import multiprocessing
import secrets
import string
import sys
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Generator, Literal, cast
import pymongo
from bottle import Bottle
from dateutil.relativedelta import SU, relativedelta
from dotenv import load_dotenv
from pymongo.operations import IndexModel
from helpers import dereference_json
from helpers.__version__ import __version__
from helpers.constants import (
DEFAULT_RULES_METADATA_REPO_ACCESS_SSM_NAME,
DOCKER_SERVICE_MODE,
PRIVATE_KEY_SECRET_NAME,
CAASEnv,
HTTPMethod,
Permission,
SettingKey,
)
from onprem.api.deployment_resources_parser import (
DeploymentResourcesApiGatewayWrapper,
)
from services import SP
from services.openapi_spec_generator import OpenApiGenerator
if TYPE_CHECKING:
from models import BaseModel
SRC = Path(__file__).parent.resolve()
ROOT = SRC.parent.resolve()
DEPLOYMENT_RESOURCES_FILENAME = 'deployment_resources.json'
ACTION_DEST = 'action'
ENV_ACTION_DEST = 'env_action'
ALL_NESTING: tuple[str, ...] = (ACTION_DEST, ENV_ACTION_DEST) # important
RUN_ACTION = 'run'
CREATE_INDEXES_ACTION = 'create_indexes'
CREATE_BUCKETS_ACTION = 'create_buckets'
GENERATE_OPENAPI_ACTION = 'generate_openapi'
INIT_VAULT_ACTION = 'init_vault'
SET_META_REPOS_ACTION = 'set_meta_repos'
UPDATE_API_GATEWAY_MODELS_ACTION = 'update_api_models'
SHOW_PERMISSIONS_ACTION = 'show_permissions'
INIT_ACTION = 'init'
DEFAULT_HOST = '0.0.0.0'
DEFAULT_PORT = 8000
DEFAULT_NUMBER_OF_WORKERS = (multiprocessing.cpu_count() * 2) + 1
DEFAULT_API_GATEWAY_NAME = 'custodian-as-a-service-api'
SYSTEM_USER = 'system_user'
SYSTEM_CUSTOMER = 'CUSTODIAN_SYSTEM'
def gen_password(digits: int = 20) -> str:
chars = string.ascii_letters + string.digits
while True:
password = ''.join(secrets.choice(chars) for _ in range(digits))
if (
any(c.islower() for c in password)
and any(c.isupper() for c in password)
and sum(c.isdigit() for c in password) >= 3
):
break
return password
logging.config.dictConfig(
{
'version': 1,
'formatters': {
'console_formatter': {'format': '%(levelname)s - %(message)s'}
},
'handlers': {
'console_handler': {
'class': 'logging.StreamHandler',
'formatter': 'console_formatter',
}
},
'loggers': {
'__main__': {'level': 'DEBUG', 'handlers': ['console_handler']}
},
}
)
_LOG = logging.getLogger(__name__)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description='Custodian configuration cli entering point'
)
# -- top level sub-parser
sub_parsers = parser.add_subparsers(
dest=ACTION_DEST, required=True, help='Available actions'
)
_ = sub_parsers.add_parser(
CREATE_INDEXES_ACTION, help='Re-create MongoDB indexes'
)
_ = sub_parsers.add_parser(
INIT_VAULT_ACTION,
help='Enables secret engine and crated a necessary token in Vault',
)
set_meta_parser = sub_parsers.add_parser(
SET_META_REPOS_ACTION,
help='Sets rules metadata gitlab repositories to vault',
)
class MetaAccessType:
def __call__(self, item: str) -> tuple[str, str]:
res = item.strip().split(':', maxsplit=1)
if len(res) != 2:
raise ValueError('Invalid value. Must be <project>:<secret>')
return res[0], res[1]
set_meta_parser.add_argument(
'--repositories',
nargs='+',
required=True,
type=MetaAccessType(),
help='List of repositories to set for meta: '
'--repositories <project1>:<secret> <project2>:<secret>',
)
_ = sub_parsers.add_parser(
CREATE_BUCKETS_ACTION, help='Creates necessary buckets in Minio'
)
_ = sub_parsers.add_parser(
UPDATE_API_GATEWAY_MODELS_ACTION,
help='Regenerates API Gateway models from existing pydantic validators',
)
_ = sub_parsers.add_parser(
SHOW_PERMISSIONS_ACTION,
help='Dumps existing permissions to stdout. '
'By default, dumps only user permission. '
'Use flags to dump admin permissions as well ',
)
_ = sub_parsers.add_parser(
INIT_ACTION, help='Creates system user and sets up some base settings'
)
_ = sub_parsers.add_parser(
GENERATE_OPENAPI_ACTION,
help='Generates Open API spec for Rule Engine API',
)
parser_run = sub_parsers.add_parser(RUN_ACTION, help='Run on-prem server')
parser_run.add_argument(
'-g',
'--gunicorn',
action='store_true',
default=False,
help='Specify the flag is you want to run the server via Gunicorn',
)
parser_run.add_argument(
'-nw',
'--workers',
type=int,
required=False,
help='Number of gunicorn workers. Must be specified only '
'if --gunicorn flag is set',
)
parser_run.add_argument(
'--host',
default=DEFAULT_HOST,
type=str,
help='IP address where to run the server',
)
parser_run.add_argument(
'--port',
default=DEFAULT_PORT,
type=int,
help='IP Port to run the server on',
)
return parser
class ActionHandler(ABC):
@staticmethod
def is_docker() -> bool:
# such a kludge due to different envs that points to on-prem env in
# LM and Modular
lm_docker = SP.environment_service.is_docker()
modular_docker = SP.modular_client.environment_service().is_docker()
return lm_docker or modular_docker
@staticmethod
def load_api_dr() -> dict:
with open(SRC / DEPLOYMENT_RESOURCES_FILENAME, 'r') as f:
data1 = json.load(f).get(DEFAULT_API_GATEWAY_NAME) or {}
with open(
SRC / 'validators' / DEPLOYMENT_RESOURCES_FILENAME, 'r'
) as f:
data2 = json.load(f).get(DEFAULT_API_GATEWAY_NAME) or {}
data1['models'] = data2.get('models') or {}
return data1
@abstractmethod
def __call__(self, **kwargs): ...
class InitVault(ActionHandler):
@staticmethod
def generate_private_key(
kty: Literal['EC', 'RSA'] = 'EC', crv='P-521', size: int = 4096
) -> str:
"""
Generates a private key and exports PEM to str encoding it to base64
:param kty:
:param crv:
:param size:
:return:
"""
from jwcrypto import jwk
match kty:
case 'EC':
key = jwk.JWK.generate(kty=kty, crv=crv)
case _: # RSA
key = jwk.JWK.generate(kty=kty, size=size)
return base64.b64encode(
key.export_to_pem(private_key=True, password=None)
).decode()
def __call__(self):
ssm = SP.ssm
if ssm.enable_secrets_engine():
_LOG.info('Vault engine was enabled')
else:
_LOG.info('Vault engine has been already enabled')
if ssm.get_secret_value(PRIVATE_KEY_SECRET_NAME):
_LOG.info('Token inside Vault already exists. Skipping...')
return
ssm.create_secret(
secret_name=PRIVATE_KEY_SECRET_NAME,
secret_value=self.generate_private_key(),
)
_LOG.info('Private token was generated and set to vault')
class InitMinio(ActionHandler):
@staticmethod
def buckets() -> tuple[str, ...]:
environment = SP.environment_service
return (
environment.get_statistics_bucket_name(),
environment.get_rulesets_bucket_name(),
environment.default_reports_bucket_name(),
environment.get_metrics_bucket_name(),
)
@staticmethod
def create_bucket(name: str) -> None:
client = SP.s3
if client.bucket_exists(bucket=name):
_LOG.info(f'Bucket {name} already exists')
return
client.create_bucket(
bucket=name, region=SP.environment_service.aws_region()
)
_LOG.info(f'Bucket {name} was created')
def __call__(self):
from services.reports_bucket import (
ReportMetaBucketsKeys,
ReportsBucketKeysBuilder,
)
for name in self.buckets():
self.create_bucket(name)
_LOG.info(f'Setting expiration for s3 paths')
SP.s3.put_path_expiration(
bucket=SP.environment_service.default_reports_bucket_name(),
rules=[
(ReportsBucketKeysBuilder.on_demand, 7),
(ReportMetaBucketsKeys.prefix, 7),
],
)
class InitMongo(ActionHandler):
main_index_name = 'main'
hash_key_order = pymongo.ASCENDING
range_key_order = pymongo.DESCENDING
@staticmethod
def models() -> tuple:
from models.batch_results import BatchResults
from models.event import Event
from models.job import Job
from models.job_statistics import JobStatistics
from models.metrics import ReportMetrics
from models.policy import Policy
from models.report_statistics import ReportStatistics
from models.retries import Retries
from models.role import Role
from models.rule import Rule
from models.rule_source import RuleSource
from models.ruleset import Ruleset
from models.scheduled_job import ScheduledJob
from models.setting import Setting
from models.user import User
return (
BatchResults,
Event,
Job,
JobStatistics,
Policy,
ReportStatistics,
Retries,
Role,
Rule,
RuleSource,
Ruleset,
ScheduledJob,
Setting,
User,
ReportMetrics,
)
@staticmethod
def _get_hash_range(model: 'BaseModel') -> tuple[str, str | None]:
h, r = None, None
for attr in model.get_attributes().values():
if attr.is_hash_key:
h = attr.attr_name
if attr.is_range_key:
r = attr.attr_name
return cast(str, h), r
@staticmethod
def _iter_indexes(
model: 'BaseModel',
) -> Generator[tuple[str, str, str | None], None, None]:
"""
Yields tuples: (index name, hash_key, range_key) indexes of the given
model. Currently, only global secondary indexes are used so this
implementation wasn't tested with local ones. Uses private PynamoDB
API because cannot find public methods that can help
"""
for index in model._indexes.values():
name = index.Meta.index_name
h, r = None, None
for attr in index.Meta.attributes.values():
if attr.is_hash_key:
h = attr.attr_name
if attr.is_range_key:
r = attr.attr_name
yield name, cast(str, h), r
def _iter_all_indexes(
self, model: 'BaseModel'
) -> Generator[tuple[str, str, str | None], None, None]:
yield self.main_index_name, *self._get_hash_range(model)
yield from self._iter_indexes(model)
@staticmethod
def _exceptional_indexes() -> tuple[str, ...]:
return (
'_id_',
'next_run_time_1', # from APScheduler
)
def ensure_indexes(self, model: 'BaseModel'):
table_name = model.Meta.table_name
_LOG.info(f'Going to check indexes for {table_name}')
collection = model.mongodb_handler().mongodb.collection(table_name)
existing = collection.index_information()
for name in self._exceptional_indexes():
existing.pop(name, None)
needed = {}
for name, h, r in self._iter_all_indexes(model):
needed[name] = [(h, self.hash_key_order)]
if r:
needed[name].append((r, self.range_key_order))
to_create = []
to_delete = set()
for name, data in existing.items():
if name not in needed:
to_delete.add(name)
continue
# name in needed so maybe the index is valid, and we must keep it
# or the index has changed, and we need to re-create it
if data.get('key', []) != needed[name]: # not valid
to_delete.add(name)
to_create.append(IndexModel(keys=needed[name], name=name))
needed.pop(name)
for name, keys in needed.items(): # all that left must be created
to_create.append(IndexModel(keys=keys, name=name))
for name in to_delete:
_LOG.info(f'Going to remove index: {name}')
collection.drop_index(name)
if to_create:
_message = ','.join(
json.dumps(i.document, separators=(',', ':'))
for i in to_create
)
_LOG.info(f'Going to create indexes: {_message}')
collection.create_indexes(to_create)
def __call__(self):
_LOG.debug('Going to sync indexes with code')
models = []
if SP.environment_service.is_docker():
models.extend(self.models())
for model in models:
self.ensure_indexes(model)
class Run(ActionHandler):
@staticmethod
def make_app(dp_wrapper: DeploymentResourcesApiGatewayWrapper) -> Bottle:
"""For gunicorn"""
from onprem.api.app import OnPremApiBuilder
builder = OnPremApiBuilder(dp_wrapper=dp_wrapper)
return builder.build()
def __call__(
self,
host: str = DEFAULT_HOST,
port: int = DEFAULT_PORT,
gunicorn: bool = False,
workers: int | None = None,
):
self._host = host
self._port = port
if not gunicorn and workers:
_LOG.warning(
'--workers is ignored because you are not running Gunicorn'
)
from onprem.api.cron_jobs import ensure_all
if CAASEnv.SERVICE_MODE.get() != DOCKER_SERVICE_MODE:
CAASEnv.SERVICE_MODE.set(DOCKER_SERVICE_MODE)
dr_wrapper = DeploymentResourcesApiGatewayWrapper(self.load_api_dr())
app = self.make_app(dr_wrapper)
SP.ap_job_scheduler.start()
ensure_all()
# ensure_retry_job()
if gunicorn:
workers = workers or DEFAULT_NUMBER_OF_WORKERS
from onprem.api.app_gunicorn import CustodianGunicornApplication
# todo allow to specify these settings from outside
options = {
'bind': f'{host}:{port}',
'workers': workers,
'timeout': 60,
'max_requests': 512,
'max_requests_jitter': 64,
}
CustodianGunicornApplication(app, options).run()
else:
app.run(host=host, port=port)
class UpdateApiGatewayModels(ActionHandler):
"""
Updates ./validators/deployment_resources.json and
./deployment_resources.json
"""
@property
def validators_module(self) -> str:
return 'validators'
@property
def models_deployment_resources(self) -> Path:
return SRC / self.validators_module / DEPLOYMENT_RESOURCES_FILENAME
@property
def mail_deployment_resources(self) -> Path:
return SRC / DEPLOYMENT_RESOURCES_FILENAME
@property
def custodian_api_gateway_name(self) -> str:
return 'custodian-as-a-service-api'
@property
def custodian_api_definition(self) -> dict:
return {
self.custodian_api_gateway_name: {
'resource_type': 'api_gateway',
'dependencies': [],
'resources': {},
'models': {},
}
}
def __call__(self, **kwargs):
from validators import registry
api_def = self.custodian_api_definition
for model in registry.iter_models(without_get=True):
schema = model.model_json_schema()
dereference_json(schema)
schema.pop('$defs', None)
api_def[self.custodian_api_gateway_name]['models'].update(
{
model.__name__: {
'content_type': 'application/json',
'schema': schema,
}
}
)
path = self.models_deployment_resources
_LOG.info(f'Updating {path}')
with open(path, 'w') as file:
json.dump(api_def, file, indent=2, sort_keys=True)
_LOG.info(f'{path} has been updated')
# here we update api gateway inside main deployment resources.
# We don't remove existing endpoints, only add new in case they are
# defined in RequestModelRegistry and are absent inside deployment
# resources. Also, we update request and response models. Default
# lambda is configuration-api-handler. Change it if it's wrong
path = self.mail_deployment_resources
_LOG.info(f'Updating {path}')
with open(path, 'r') as file:
deployment_resources = json.load(file)
api = deployment_resources.get(self.custodian_api_gateway_name)
if not api:
_LOG.warning('Api gateway not found in deployment_resources')
return
resources = api.setdefault('resources', {})
for item in registry.iter_all():
# if endpoint & method are defined, just update models.
# otherwise add configuration
data = resources.setdefault(
item.path,
{'policy_statement_singleton': True, 'enable_cors': True},
).setdefault(
item.method.value,
{
'integration_type': 'lambda',
'enable_proxy': True,
'lambda_alias': '${lambdas_alias_name}',
'authorization_type': 'authorizer'
if item.auth
else 'NONE',
'lambda_name': 'caas-configuration-api-handler',
},
)
data.pop('method_request_models', None)
data.pop('responses', None)
data.pop('method_request_parameters', None)
if model := item.request_model:
match item.method:
case HTTPMethod.GET:
params = {}
for name, info in model.model_fields.items():
params[f'method.request.querystring.{name}'] = (
info.is_required()
)
data['method_request_parameters'] = params
case _:
data['method_request_models'] = {
'application/json': model.__name__
}
responses = []
for st, m, description in item.responses:
resp = {'status_code': str(st.value)}
if m:
resp['response_models'] = {'application/json': m.__name__}
responses.append(resp)
data['responses'] = responses
with open(path, 'w') as file:
json.dump(deployment_resources, file, indent=2)
_LOG.info(f'{path} has been updated')
class InitAction(ActionHandler):
def __call__(self):
from models.setting import Setting
if not Setting.get_nullable(SettingKey.SYSTEM_CUSTOMER):
_LOG.info('Setting system customer name')
Setting(
name=CAASEnv.SYSTEM_CUSTOMER_NAME.value, value=SYSTEM_CUSTOMER
).save()
if not Setting.get_nullable(
SettingKey.REPORT_DATE_MARKER.value
): # todo redesign
_LOG.info('Setting report date marker')
Setting(
name=SettingKey.REPORT_DATE_MARKER.value,
value={
'last_week_date': (
datetime.today() + relativedelta(weekday=SU(-1))
)
.date()
.isoformat(),
'current_week_date': (
datetime.today() + relativedelta(weekday=SU(0))
)
.date()
.isoformat(),
},
).save()
Setting(name=SettingKey.SEND_REPORTS, value=True).save()
users_client = SP.users_client
if not users_client.get_user_by_username(SYSTEM_USER):
_LOG.info('Creating a system user')
password = CAASEnv.SYSTEM_USER_PASSWORD.get(None)
from_env = bool(password)
if not from_env:
password = gen_password()
users_client.signup_user(
username=SYSTEM_USER,
password=password,
customer=SYSTEM_CUSTOMER,
)
if not from_env:
print(f'System ({SYSTEM_USER}) password: {password}')
else:
print(f'System ({SYSTEM_USER}) was created')
else:
_LOG.info('System user already exists')
_LOG.info('Done')
class GenerateOpenApi(ActionHandler):
def __call__(self):
from validators import registry
data = self.load_api_dr()
generator = OpenApiGenerator(
title='Rule Engine - OpenAPI 3.0',
description='Rule engine rest api',
url=f'http://{DEFAULT_HOST}:{DEFAULT_PORT}',
stages=DeploymentResourcesApiGatewayWrapper(data).stage,
version=__version__,
endpoints=registry.iter_all(),
)
json.dump(generator.generate(), sys.stdout, separators=(',', ':'))
class ShowPermissions(ActionHandler):
def __call__(self):
json.dump(sorted(Permission.iter_enabled()), sys.stdout, indent=4)
class SetMetaRepos(ActionHandler):
def __call__(self, repositories: list[tuple[str, str]]):
ssm = SP.ssm
ssm.create_secret(
secret_name=DEFAULT_RULES_METADATA_REPO_ACCESS_SSM_NAME,
secret_value=[
{
'project': i[0],
'ref': 'main',
'secret': i[1],
'url': 'https://git.epam.com',
}
for i in repositories
],
)
_LOG.info('Repositories were set')
def main(args: list[str] | None = None):
parser = build_parser()
arguments = parser.parse_args(args)
key = tuple(
getattr(arguments, dest)
for dest in ALL_NESTING
if hasattr(arguments, dest)
)
mapping: dict[tuple[str, ...], Callable | ActionHandler] = {
(INIT_VAULT_ACTION,): InitVault(),
(SET_META_REPOS_ACTION,): SetMetaRepos(),
(CREATE_INDEXES_ACTION,): InitMongo(),
(CREATE_BUCKETS_ACTION,): InitMinio(),
(GENERATE_OPENAPI_ACTION,): GenerateOpenApi(),
(RUN_ACTION,): Run(),
(UPDATE_API_GATEWAY_MODELS_ACTION,): UpdateApiGatewayModels(),
(SHOW_PERMISSIONS_ACTION,): ShowPermissions(),
(INIT_ACTION,): InitAction(),
}
func: Callable = mapping.get(key) or (lambda **kwargs: _LOG.error('Hello'))
for dest in ALL_NESTING:
if hasattr(arguments, dest):
delattr(arguments, dest)
load_dotenv(verbose=True)
try:
func(**vars(arguments))
except Exception as e:
_LOG.error(f'Unexpected exception occurred: {e}')
exit(1) # some connection errors for entrypoint.sh
if __name__ == '__main__':
main()