message_flow/events/subscriber/domain_event_envelope.py (33 lines of code) (raw):
from typing import Generic, TypeVar
from message_flow.events.common import DomainEvent
from message_flow.messaging.common import IMessage
T = TypeVar("T", bound=DomainEvent)
class DomainEventEnvelope(Generic[T]):
def __init__(
self,
message: IMessage,
aggregate_type: str,
aggregate_id: str,
event_id: str,
event: T,
) -> None:
self._message = message
self._aggregate_type = aggregate_type
self._aggregate_id = aggregate_id
self._event_id = event_id
self._event = event
@property
def aggregate_id(self) -> str:
return self._aggregate_id
@property
def message(self) -> IMessage:
return self._message
@property
def event(self) -> T:
return self._event
@property
def aggregate_type(self) -> str:
return self._aggregate_type
@property
def event_id(self) -> str:
return self._event_id