def _get_rabbitmq_credentials()

in modular_sdk/services/impl/maestro_credentials_service.py [0:0]


    def _get_rabbitmq_credentials(self, application: Application,
                                  tenant: Optional[Tenant] = None
                                  ) -> Optional[RabbitMQCredentials]:
        if not application.secret:
            _LOG.warning(f'Application {application.application_id} does not '
                         f'contain secret')
            return
        secret = self._ssm_service.get_parameter(application.secret)
        if not secret:
            _LOG.warning(f'Secret {application.secret} exists in application,'
                         f' but not in SSM')
            return
        if not isinstance(secret, dict):  # it must be dict
            return
        meta = RabbitMQApplicationMeta.from_dict(application.meta.as_dict())
        secret = RabbitMQApplicationSecret.from_dict(secret)
        return RabbitMQCredentials(
            connection_url=secret.connection_url,
            sdk_secret_key=secret.sdk_secret_key,
            maestro_user=meta.maestro_user,
            rabbit_exchange=meta.rabbit_exchange,
            request_queue=meta.request_queue,
            response_queue=meta.response_queue,
            sdk_access_key=meta.sdk_access_key
        )