message_flow/events/common/event_util.py (22 lines of code) (raw):

from typing import Dict from uuid import uuid4 from message_flow.events.common.event_message_headers import EventMessageHeaders from message_flow.messaging.common import IMessage from message_flow.messaging.producer import MessageBuilder def make_message_for_domain_event( aggregate_type: str, aggregate_id: str, payload: bytes, event_type: str, *, headers: Dict[str, str] = None, ) -> IMessage: return ( MessageBuilder.with_payload(payload) .with_extra_headers("", headers if headers else {}) .with_header(EventMessageHeaders.AGGREGATE_ID, aggregate_id) .with_header(EventMessageHeaders.AGGREGATE_TYPE, aggregate_type) .with_header(EventMessageHeaders.EVENT_TYPE, event_type) .with_header(IMessage.ID, uuid4().hex) .build() )