in modular_sdk/services/impl/maestro_credentials_service.py [0:0]
def _get_rabbitmq_credentials(self, application: Application,
tenant: Optional[Tenant] = None
) -> Optional[RabbitMQCredentials]:
if not application.secret:
_LOG.warning(f'Application {application.application_id} does not '
f'contain secret')
return
secret = self._ssm_service.get_parameter(application.secret)
if not secret:
_LOG.warning(f'Secret {application.secret} exists in application,'
f' but not in SSM')
return
if not isinstance(secret, dict): # it must be dict
return
meta = RabbitMQApplicationMeta.from_dict(application.meta.as_dict())
secret = RabbitMQApplicationSecret.from_dict(secret)
return RabbitMQCredentials(
connection_url=secret.connection_url,
sdk_secret_key=secret.sdk_secret_key,
maestro_user=meta.maestro_user,
rabbit_exchange=meta.rabbit_exchange,
request_queue=meta.request_queue,
response_queue=meta.response_queue,
sdk_access_key=meta.sdk_access_key
)