modular_sdk/services/impl/maestro_credentials_service.py (477 lines of code) (raw):

import base64 import dataclasses import io import json import tempfile import uuid from contextlib import contextmanager from functools import cached_property from pathlib import Path from time import time from typing import Optional, Union, Dict, Callable, TypeVar, TypedDict, Literal from botocore.exceptions import ClientError from urllib3.util import parse_url, Url from modular_sdk.commons import DataclassBase from modular_sdk.commons.constants import DEFAULT_AWS_REGION, \ ENV_AZURE_SUBSCRIPTION_ID, ENV_CLOUDSDK_CORE_PROJECT, HTTPS_ATTR, \ HTTP_ATTR, Cloud, ApplicationType from modular_sdk.commons.log_helper import get_logger from modular_sdk.models.application import Application from modular_sdk.models.parent import Parent from modular_sdk.models.tenant import Tenant from modular_sdk.modular import Modular from modular_sdk.services.application_service import ApplicationService from modular_sdk.services.environment_service import EnvironmentService, \ EnvironmentContext from modular_sdk.services.parent_service import ParentService from modular_sdk.services.ssm_service import AbstractSSMClient from modular_sdk.services.sts_service import StsService from modular_sdk.services.tenant_service import TenantService # GOOGLE MA_SSM_PROJECT_ID = 'project_id' _LOG = get_logger(__name__) @dataclasses.dataclass() class AccessMeta(DataclassBase): """ Common model to keep access data. It works this way: >>> meta = AccessMeta.from_dict({}) >>> meta.update_host(host='https://epam.com/hello') >>> meta.dict() {'host': 'epam.com', 'stage': 'hello', 'port': 443, 'protocol': 'HTTPS'} >>> meta.update_host(host='https://epam.com/hello', port=80, protocol='http', stage='/dev') >>> meta.dict() {'host': 'epam.com', 'stage': 'dev', 'port': 80, 'protocol': 'HTTP'} """ host: Optional[str] stage: Optional[str] # path prefix without "/" in case it exists port: Optional[int] protocol: Optional[Literal['HTTP', 'HTTPS']] def update_host(self, host: Optional[str] = None, port: Optional[int] = None, protocol: Optional[str] = None, stage: Optional[str] = None): """ Use ONLY this method to update attributes :param host: :param port: :param protocol: :param stage: :return: """ parsed: Url = parse_url(host) # works with None _host = parsed.host _port = port or parsed.port or 443 # 443 default _stage = stage or parsed.path _protocol = protocol or parsed.scheme or \ (HTTPS_ATTR if _port == 443 else HTTP_ATTR) if _host: self.host = _host if _port: self.port = _port if _stage: self.stage = _stage.strip('/') if _protocol: self.protocol = _protocol.upper() @property def url(self) -> Optional[str]: if not self.host: return url = self.host.strip('/') # assuming that host is without protocol if self.port: url += f':{self.port}' if self.protocol: url = self.protocol.lower() + '://' + url if self.stage: url += '/' + self.stage return url # Applications meta and secrets definitions @dataclasses.dataclass() class DefectDojoApplicationMeta(AccessMeta): """ Application with type 'DEFECT_DOJO' """ @dataclasses.dataclass(repr=False) class DefectDojoApplicationSecret(DataclassBase): """ Application with type 'CUSTODIAN' secret """ api_key: str @dataclasses.dataclass() class CustodianApplicationMeta(AccessMeta): """ Application with type 'CUSTODIAN' meta """ username: Optional[str] results_storage: Optional[str] @dataclasses.dataclass() class RabbitMQApplicationMeta(DataclassBase): """ Application with type 'RABBITMQ' meta """ maestro_user: Optional[str] = None rabbit_exchange: Optional[str] = None request_queue: Optional[str] = None response_queue: Optional[str] = None sdk_access_key: Optional[str] = None @dataclasses.dataclass(repr=False) class RabbitMQApplicationSecret(DataclassBase): """ Application with type 'RABBITMQ' secret """ connection_url: str sdk_secret_key: str @dataclasses.dataclass() class AWSRoleApplicationMeta(DataclassBase): """ Application with type 'AWS_ROLE' meta """ roleName: str accountNumber: Optional[str] = None uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) @dataclasses.dataclass() class AWSCredentialsApplicationMeta(DataclassBase): """ Application with type 'AWS_CREDENTIALS' meta """ accountNumber: str uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) @dataclasses.dataclass(repr=False) class AWSCredentialsApplicationSecret(DataclassBase): """ Application with type 'AWS_CREDENTIALS' secret """ accessKeyId: str secretAccessKey: str sessionToken: Optional[str] = None defaultRegion: Optional[str] = None @dataclasses.dataclass() class AZURECredentialsApplicationMeta(DataclassBase): """ Application with type 'AZURE_CREDENTIALS' meta """ clientId: Optional[str] = None tenantId: Optional[str] = None uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) @dataclasses.dataclass(repr=False) class AZURECredentialsApplicationSecret(DataclassBase): """ Application with type 'AZURE_CREDENTIALS' secret """ client_id: str tenant_id: str api_key: str @dataclasses.dataclass() class AZURECertificateApplicationMeta(DataclassBase): """ Application with type 'AZURE_CERTIFICATE' meta """ clientId: Optional[str] = None tenantId: Optional[str] = None uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) @dataclasses.dataclass(repr=False) class AZURECertificateApplicationSecret(DataclassBase): """ Application with type 'AZURE_CERTIFICATE' secret """ certificate_base64: str certificate_password: Optional[str] = None @dataclasses.dataclass() class GCPServiceAccountApplicationMeta(DataclassBase): """ Application with type 'GCP_SERVICE_ACCOUNT', 'GCP_COMPUTE_ACCOUNT' meta """ adminProjectId: Optional[str] = None uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) # credentials definitions class GOOGLECredentialsRaw1(TypedDict): type: str project_id: str private_key_id: str private_key: str client_email: str client_id: str auth_uri: str token_uri: str auth_provider_x509_cert_url: str client_x509_cert_url: str # ----- not used currently ----- # class GOOGLECredentialsRaw2(TypedDict): # type: str # access_token: str # refresh_token: str # client_id: str # client_secret: str # project_id: str # # # class GOOGLECredentialsRaw3(TypedDict): # access_token: str # project_id: str # ----- not used currently ----- # K8SKubeConfigApplicationSecret contains raw kubeconfig class _CredentialsBase(DataclassBase): """ Some useful method for credentials. """ @property @contextmanager def export(self): _context = EnvironmentContext(self.dict(), reset_all=False) _context.set() try: yield finally: _context.clear() @dataclasses.dataclass(frozen=True, repr=False) class AWSCredentials(_CredentialsBase): AWS_ACCESS_KEY_ID: str AWS_SECRET_ACCESS_KEY: str AWS_SESSION_TOKEN: Optional[str] = None AWS_DEFAULT_REGION: Optional[str] = DEFAULT_AWS_REGION @dataclasses.dataclass(frozen=True, repr=False) class AZURECredentials(_CredentialsBase): AZURE_TENANT_ID: str AZURE_CLIENT_ID: str AZURE_CLIENT_SECRET: str AZURE_SUBSCRIPTION_ID: Optional[str] = None @dataclasses.dataclass(frozen=True, repr=False) class AZURECertificate(_CredentialsBase): AZURE_TENANT_ID: str AZURE_CLIENT_ID: str AZURE_CLIENT_CERTIFICATE_PATH: Path # full path to file AZURE_CLIENT_CERTIFICATE_PASSWORD: Optional[str] = None AZURE_SUBSCRIPTION_ID: Optional[str] = None def get_raw(self) -> io.BytesIO: stream = io.BytesIO() with open(self.AZURE_CLIENT_CERTIFICATE_PATH, 'rb') as fp: stream.write(fp.read()) stream.seek(0) return stream @dataclasses.dataclass(frozen=True, repr=False) class GOOGLECredentials(_CredentialsBase): GOOGLE_APPLICATION_CREDENTIALS: Path # full path to file CLOUDSDK_CORE_PROJECT: Optional[str] = None def get_raw(self) -> GOOGLECredentialsRaw1: with open(self.GOOGLE_APPLICATION_CREDENTIALS, 'r') as fp: return json.load(fp) @dataclasses.dataclass(frozen=True, repr=False) class RabbitMQCredentials(_CredentialsBase): connection_url: str sdk_secret_key: str maestro_user: Optional[str] = None rabbit_exchange: Optional[str] = None request_queue: Optional[str] = None response_queue: Optional[str] = None sdk_access_key: Optional[str] = None Credentials = Union[ AWSCredentials, AZURECredentials, AZURECertificate, GOOGLECredentials, RabbitMQCredentials, ] class MaestroCredentialsService: """ Allows to retrieve credentials from Maestro applications. Each method returns a frozen dataclass which contain credentials in their environment variables format (meaning that if you convert the dataclass object to dict and export each key and value to envs, credentials should work). The basic flow is the following: mcs = Modular().maestro_credentials_service() # credentials = mcs.get_by_parent('<parent id>') # or parent item # credentials = mcs.get_by_application('<application id>') # or item credentials = mcs.get_by_tenant('DEV2') # or item credentials = mcs.complete_credentials(credentials) if not credentials: return 'error' with credentials.export: boto3.client('sts').get_caller_identity() # or do something else with credentials Note: method `complete_credentials` is separated because in involves some logic which can be performed after credentials were retrieved from na application. For example, azure credentials require subscription id, but not all the applications contain it. So, we must get it from a tenant. """ CT = TypeVar('CT', bound=Credentials) def __init__(self, tenant_service: TenantService, parent_service: ParentService, application_service: ApplicationService, environment_service: EnvironmentService, ssm_service: AbstractSSMClient, sts_service: StsService): """ In case the service is deployed to AWS, we apparently will need to go to another AWS account to get ssm secrets from Application. We use modular_client.assume_role_ssm_service for this. For on-prem currently we use our own service, because Modular does not support Vault. """ self._tenant_service = tenant_service self._parent_service = parent_service self._application_service = application_service self._environment_service = environment_service self._ssm_service = ssm_service self._sts_service = sts_service @classmethod def build(cls, tenant_service: Optional[TenantService] = None, parent_service: Optional[ParentService] = None, application_service: Optional[ApplicationService] = None, environment_service: Optional[EnvironmentService] = None, ssm_service: Optional[AbstractSSMClient] = None, sts_service: Optional[StsService] = None ) -> 'MaestroCredentialsService': """ Allows to build the service specifying some services to override. SSM Service is expected to be overriden because some applications (like AWS_ROLE, AZURE_CREDENTIALS) have their secrets on Maestro prod, and some applications (RABBITMQ) will probably have their secrets on our prod. So we need different ssm clients :return: MaestroCredentialsService. By default, maestro ssm client is used. """ modular = Modular() return cls( tenant_service=tenant_service or modular.tenant_service(), parent_service=parent_service or modular.parent_service(), application_service=application_service or modular.application_service(), environment_service=environment_service or modular.environment_service(), ssm_service=ssm_service or modular.assume_role_ssm_service(), sts_service=sts_service or modular.sts_service() ) def _assure_tenant_obj(self, tenant: Union[Tenant, str] ) -> Optional[Tenant]: item = tenant if isinstance(tenant, Tenant) \ else self._tenant_service.get(tenant) return item if item and item.is_active else None def _assure_application_obj(self, application: Union[Application, str] ) -> Optional[Application]: item = application if isinstance(application, Application) \ else self._application_service.get_application_by_id(application) return item if item and not item.is_deleted else None def _assure_parent_obj(self, parent: Union[Parent, str] ) -> Optional[Parent]: item = parent if isinstance(parent, Parent) \ else self._parent_service.get_parent_by_id(parent) return item if item and not item.is_deleted else None @staticmethod def _parent_id_from_tenant(tenant: Tenant) -> Optional[str]: return tenant.management_parent_id def _default_aws_region(self) -> str: return (self._environment_service.aws_region() or self._environment_service.default_aws_region() or DEFAULT_AWS_REGION) def complete_credentials(self, credentials: Optional[CT], tenant: Tenant, **kwargs) -> Optional[CT]: """ Some credentials (for example AZURE) must be expanded with subscription id which is sited withing a tenant but not application. Here we handle these special cases """ if not credentials: return _LOG.info('Going to fulfill the credentials') return type(credentials)(**self.complete_credentials_dict( credentials=credentials.dict(), tenant=tenant, **kwargs )) @staticmethod def complete_credentials_dict(credentials: dict, tenant: Tenant, **kwargs) -> dict: _LOG.info('Going to fulfill the credentials') if tenant.cloud == Cloud.AZURE: _LOG.info('Tenant`s cloud is AZURE. Adding subscription id') if not credentials.get(ENV_AZURE_SUBSCRIPTION_ID): credentials[ENV_AZURE_SUBSCRIPTION_ID] = tenant.project return credentials elif tenant.cloud == Cloud.AWS: _LOG.info('Tenant`s cloud is AWS. Proxying creds') return credentials elif tenant.cloud == Cloud.GOOGLE: _LOG.info('Creds are requested for google tenant. ' 'Adding project id') credentials[ENV_CLOUDSDK_CORE_PROJECT] = tenant.project _LOG.debug(f'Google credentials project_id: ' f'{credentials[ENV_CLOUDSDK_CORE_PROJECT]}') return credentials _LOG.info('Not known cloud. Proxying whatever was received') return credentials def get_by_tenant(self, tenant: Union[Tenant, str], key: Optional[Callable[[Tenant], Optional[str]]] = None ) -> Optional[Credentials]: """ :param tenant: Union[Tenant, str] :param key: function which will retrieve parent_id from tenant. By default, management_parent_id is used :return: """ tenant_obj = self._assure_tenant_obj(tenant) if not tenant_obj: _LOG.warning(f'Tenant: {tenant} not found') return _get_parent_id = key or self._parent_id_from_tenant pid = _get_parent_id(tenant) if not pid: _LOG.warning(f'Tenant does not contain management ' f'parent id.') return return self.get_by_parent(pid, tenant_obj) def get_by_parent(self, parent: Union[Parent, str], tenant: Optional[Tenant] = None, ) -> Optional[Credentials]: parent_obj = self._assure_parent_obj(parent) if not parent_obj: _LOG.warning(f'Parent: {parent} not found') return aid = parent_obj.application_id if not aid: _LOG.warning(f'Parent {parent} does not contain application id.') return return self.get_by_application(aid, tenant) def get_by_application(self, application: Union[Application, str], tenant: Optional[Tenant] = None, ) -> Optional[Credentials]: """ Retrieves everything it can from application. Currently only some types are implemented """ application_obj = self._assure_application_obj(application) if not application_obj: _LOG.warning(f'Application: {application} not found') return getter = self.application_type_to_getter.get(application_obj.type) if not getter: _LOG.warning( f'Not available application type {application_obj.type}') return return getter(application_obj, tenant) @cached_property def application_type_to_getter( self) -> Dict[ str, Callable[[Application, Optional[Tenant]], Credentials]]: """ Method must have application as input """ return { ApplicationType.AZURE_CREDENTIALS: self._get_azure_credentials, ApplicationType.AZURE_CERTIFICATE: self._get_azure_certificate, ApplicationType.AWS_CREDENTIALS: self._get_aws_credentials, ApplicationType.AWS_ROLE: self._get_aws_credentials_from_role, ApplicationType.GCP_SERVICE_ACCOUNT: self._get_gcp_credentials, ApplicationType.GCP_COMPUTE_ACCOUNT: self._get_gcp_credentials, ApplicationType.RABBITMQ: self._get_rabbitmq_credentials, } def _get_aws_credentials_from_role(self, application: Application, tenant: Optional[Tenant] = None, ) -> Optional[AWSCredentials]: meta = AWSRoleApplicationMeta.from_dict(application.meta.as_dict()) role = meta.roleName account_id = tenant.project if tenant else meta.accountNumber if not role: return role_arn = self._sts_service.build_role_arn(role, account_id) try: creds = self._sts_service.assume_role( role_arn=role_arn, duration=3600, role_session_name=f'credentials-service-{str(int(time()))}' ) return AWSCredentials( AWS_ACCESS_KEY_ID=creds['aws_access_key_id'], AWS_SECRET_ACCESS_KEY=creds['aws_secret_access_key'], AWS_SESSION_TOKEN=creds['aws_session_token'], AWS_DEFAULT_REGION=self._default_aws_region() ) except (ClientError, ConnectionAbortedError) as e: _LOG.warning(f'Error occurred trying to assume role: {role}. {e}') return def _get_aws_credentials(self, application: Application, tenant: Optional[Tenant] = None, ) -> Optional[AWSCredentials]: if not application.secret: _LOG.warning(f'Application {application.application_id} does not ' f'contain secret') return secret = self._ssm_service.get_parameter(application.secret) if not secret: _LOG.warning(f'Secret {application.secret} exists in application,' f' but not in SSM') return secret = AWSCredentialsApplicationSecret.from_dict(secret) return AWSCredentials( AWS_ACCESS_KEY_ID=secret.accessKeyId, AWS_SECRET_ACCESS_KEY=secret.secretAccessKey, AWS_SESSION_TOKEN=secret.sessionToken, AWS_DEFAULT_REGION=secret.defaultRegion or self._default_aws_region() ) def _get_azure_credentials(self, application: Application, tenant: Optional[Tenant] = None, ) -> Optional[AZURECredentials]: if not application.secret: _LOG.warning(f'Application {application.application_id} does not ' f'contain secret') return secret = self._ssm_service.get_parameter(application.secret) if not secret: _LOG.warning(f'Secret {application.secret} exists in application,' f' but not in SSM') return if isinstance(secret, str): meta = AZURECredentialsApplicationMeta.from_dict( application.meta.as_dict() ) tenant_id = meta.tenantId client_id = meta.clientId api_key = secret else: # isinstance(secret, dict) secret = AZURECredentialsApplicationSecret.from_dict(secret) tenant_id = secret.tenant_id client_id = secret.client_id api_key = secret.api_key return AZURECredentials( AZURE_TENANT_ID=tenant_id, AZURE_CLIENT_ID=client_id, AZURE_CLIENT_SECRET=api_key, AZURE_SUBSCRIPTION_ID=tenant.project if tenant else None ) def _get_azure_certificate(self, application: Application, tenant: Optional[Tenant] = None, ) -> Optional[AZURECertificate]: if not application.secret: _LOG.warning(f'Application {application.application_id} does not ' f'contain secret') return secret = self._ssm_service.get_parameter(application.secret) if not secret: _LOG.warning(f'Secret {application.secret} exists in application,' f' but not in SSM') return meta = AZURECertificateApplicationMeta.from_dict( application.meta.as_dict() ) secret = AZURECertificateApplicationSecret.from_dict(secret) with tempfile.NamedTemporaryFile('wb', delete=False) as fp: fp.write(base64.b64decode(secret.certificate_base64)) return AZURECertificate( AZURE_TENANT_ID=meta.tenantId, AZURE_CLIENT_ID=meta.clientId, AZURE_CLIENT_CERTIFICATE_PATH=Path(fp.name), AZURE_CLIENT_CERTIFICATE_PASSWORD=secret.certificate_password ) def _get_gcp_credentials(self, application: Application, tenant: Optional[Tenant] = None, ) -> Optional[GOOGLECredentials]: if not application.secret: _LOG.warning(f'Application {application.application_id} does not ' f'contain secret') return secret = self._ssm_service.get_parameter(application.secret) if not secret: _LOG.warning(f'Secret {application.secret} exists in application,' f' but not in SSM') return if not isinstance(secret, dict): # it must be dict return meta = GCPServiceAccountApplicationMeta.from_dict( application.meta.as_dict() ) project_id = meta.adminProjectId secret.setdefault(MA_SSM_PROJECT_ID, project_id) # just in case with tempfile.NamedTemporaryFile('w', delete=False) as fp: json.dump(secret, fp) return GOOGLECredentials( GOOGLE_APPLICATION_CREDENTIALS=Path(fp.name), CLOUDSDK_CORE_PROJECT=project_id # or secret['project_id'] ? ) def _get_rabbitmq_credentials(self, application: Application, tenant: Optional[Tenant] = None ) -> Optional[RabbitMQCredentials]: if not application.secret: _LOG.warning(f'Application {application.application_id} does not ' f'contain secret') return secret = self._ssm_service.get_parameter(application.secret) if not secret: _LOG.warning(f'Secret {application.secret} exists in application,' f' but not in SSM') return if not isinstance(secret, dict): # it must be dict return meta = RabbitMQApplicationMeta.from_dict(application.meta.as_dict()) secret = RabbitMQApplicationSecret.from_dict(secret) return RabbitMQCredentials( connection_url=secret.connection_url, sdk_secret_key=secret.sdk_secret_key, maestro_user=meta.maestro_user, rabbit_exchange=meta.rabbit_exchange, request_queue=meta.request_queue, response_queue=meta.response_queue, sdk_access_key=meta.sdk_access_key )