message_flow/commands/producer/command_producer.py (39 lines of code) (raw):
import logging
from typing import Dict, Optional
from message_flow.commands.common import Command, make_message_for_command
from message_flow.events.mappers.interfaces import ISerializer
from message_flow.events.mappers.json_mapper import JsonMapper
from message_flow.messaging.common.interfaces import IMessage
from message_flow.messaging.producer.message_producer import IMessageProducer
_logger = logging.getLogger(__name__)
class CommandProducer:
def __init__(
self,
message_producer: IMessageProducer,
*,
serializer: ISerializer = JsonMapper()
) -> None:
self._message_producer = message_producer
self._serializer = serializer
def send(
self,
channel: str,
command: Command,
reply_to: str,
*,
headers: Optional[Dict[str, str]] = None
) -> str:
message: IMessage = make_message_for_command(
channel,
self._serializer.serialize(command),
command.__class__.__name__,
reply_to,
headers=headers if headers else {},
)
_logger.info(
"Sending command %s with payload %s...",
command.__class__.__name__,
message.payload.decode(),
)
self._message_producer.send(channel, message)
return message.get_id()