message_flow/events/publisher/domain_event_publisher.py (40 lines of code) (raw):

import logging from typing import Dict, List from message_flow.events.common import DomainEvent, make_message_for_domain_event from message_flow.events.mappers import ISerializer, JsonMapper from message_flow.messaging.producer import IMessageProducer _logger = logging.getLogger(__name__) class DomainEventPublisher: def __init__( self, message_producer: IMessageProducer, *, serializer: ISerializer = JsonMapper() ) -> None: self._message_producer = message_producer self._serializer = serializer def publish( self, aggregate_type: str, aggregate_id: str, domain_events: List[DomainEvent], *, headers: Dict[str, str] = {} ) -> None: for event in domain_events: payload: bytes = self._serializer.serialize(event) _logger.info( "Publishing event %s with payload %s...", event.__class__.__name__, payload.decode(), ) self._message_producer.send( aggregate_type, make_message_for_domain_event( aggregate_type, aggregate_id, payload, event.__class__.__name__, headers=headers, ), )