in src/run.py [0:0]
def get_credentials(tenant: Tenant,
batch_results: BatchResults | None = None) -> dict:
"""
Tries to retrieve credentials to scan the given tenant with such
priorities:
1. env "CREDENTIALS_KEY" - gets key name and then gets credentials
from SSM. This is the oldest solution, in can sometimes be used if
the job is standard and a user has given credentials directly;
The SSM parameter is removed after the creds are received.
2. Only for event-driven jobs. Gets credentials_key (SSM parameter name)
from "batch_result.credentials_key". Currently, the option is obsolete.
API in no way will set credentials key there. But maybe some time...
3. 'CUSTODIAN_ACCESS' key in the tenant's parent_map. It points to the
parent with type 'CUSTODIAN_ACCESS' as well. That parent is linked
to an application with credentials
4. Maestro management_parent_id -> management creds. Tries to resolve
management parent from tenant and then management credentials. This
option can be used only if the corresponding env is set to 'true'.
Must be explicitly allowed because the option is not safe.
5. Checks whether instance by default has access to the given tenant
If not credentials are found, ExecutorException is raised
"""
mcs = SP.modular_client.maestro_credentials_service()
_log_start = 'Trying to get credentials from '
credentials = {}
# 1.
if not credentials:
_LOG.info(_log_start + '\'CREDENTIALS_KEY\' env')
credentials = BSP.credentials_service.get_credentials_from_ssm()
if credentials and tenant.cloud == Cloud.GOOGLE:
credentials = BSP.credentials_service.google_credentials_to_file(
credentials)
# 2.
if not credentials and batch_results and batch_results.credentials_key:
_LOG.info(_log_start + 'batch_results.credentials_key')
credentials = BSP.credentials_service.get_credentials_from_ssm(
batch_results.credentials_key)
if credentials and tenant.cloud == Cloud.GOOGLE:
credentials = BSP.credentials_service.google_credentials_to_file(
credentials)
# 3.
if not credentials:
_LOG.info(_log_start + '`CUSTODIAN_ACCESS` parent')
parent = SP.modular_client.parent_service().get_linked_parent_by_tenant(
tenant=tenant,
type_=ParentType.CUSTODIAN_ACCESS
)
if parent:
application = SP.modular_client.application_service().get_application_by_id(parent.application_id)
if application:
_creds = mcs.get_by_application(application, tenant)
if _creds:
credentials = _creds.dict()
# 4.
if not credentials and BSP.environment_service.is_management_creds_allowed():
_LOG.info(_log_start + 'Maestro management parent & application')
_creds = mcs.get_by_tenant(tenant=tenant)
if _creds: # not a dict
credentials = _creds.dict()
# 5
if not credentials:
_LOG.info(_log_start + 'instance profile')
# TODO refactor
match tenant.cloud:
case Cloud.AWS:
try:
aid = StsClient.factory().build().get_caller_identity()['Account']
_LOG.debug('Instance profile found')
if aid == tenant.project:
_LOG.info('Instance profile credentials match to tenant id')
return {}
except (Exception, ClientError) as e:
_LOG.warning(f'No instance credentials found: {e}')
case Cloud.AZURE:
try:
from c7n_azure.session import Session
aid = Session().subscription_id
_LOG.info('subscription id found')
if aid == tenant.project:
_LOG.info('Subscription id matches to tenant id')
return {}
except BaseException: # catch sys.exit(1)
_LOG.warning('Could not find azure subscription id')
if credentials:
credentials = mcs.complete_credentials_dict(
credentials=credentials,
tenant=tenant
)
return credentials
raise ExecutorException(ExecutorError.NO_CREDENTIALS)