modular_sdk/services/sqs_service.py (34 lines of code) (raw):

import json from botocore.exceptions import ParamValidationError, ClientError from modular_sdk.commons.log_helper import get_logger from modular_sdk.services.aws_creds_provider import AWSCredentialsProvider from modular_sdk.services.environment_service import EnvironmentService _LOG = get_logger(__name__) class SQSService(AWSCredentialsProvider): def __init__(self, aws_region, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, environment_service: EnvironmentService = None): super(SQSService, self).__init__( service_name='sqs', aws_region=aws_region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token ) self.environment_service = environment_service def send_message(self, message: dict) -> bool: queue = self.environment_service.queue_url() if not queue: _LOG.warning('SQS Queue is not set to envs. Message won`t be set') return False try: self.client.send_message( QueueUrl=queue, MessageBody=json.dumps(message) ) return True except (ClientError, ParamValidationError) as e: _LOG.warning( f'Cannot push message to the SQS \'{queue}\' queue: {e}' ) return False