message_flow/messaging/common/message.py (36 lines of code) (raw):

from typing import Any, Dict, Optional from message_flow.messaging.common.interfaces import IMessage from message_flow.messaging.exceptions import HeaderNotFound class Message(IMessage): def __init__(self, payload: bytes, headers: Dict[str, str]) -> None: self._payload = payload self._headers = headers def __str__(self) -> str: return f"{self._payload!r}" @property def payload(self) -> bytes: return self._payload @payload.setter def payload(self, payload: bytes) -> None: self._payload = payload @property def headers(self) -> Dict[str, Any]: return self._headers @headers.setter def headers(self, headers: Dict[str, str]) -> None: self._headers = headers def get_id(self) -> str: return self.get_required_header(Message.ID) def get_header(self, name: str) -> Optional[str]: return self._headers.get(name) def get_required_header(self, name: str) -> str: value = self._headers.get(name) if value is None: raise HeaderNotFound(f"No such header: {name} in this message {self}") return value def has_header(self, name: str) -> bool: return name in self._headers def set_header(self, name: str, value: str) -> None: self._headers[name] = value def remove_header(self, name: str) -> None: del self._headers[name]