message_flow/messaging/producer/message_builder.py (23 lines of code) (raw):
from typing import Dict
from message_flow.messaging.common import IMessage, Message
class MessageBuilder:
def __init__(self, body: bytes, *, headers: Dict[str, str] = None) -> None:
self._body = body
self._headers = headers if headers is not None else {}
@classmethod
def with_message(cls, message: IMessage) -> "MessageBuilder":
return cls(message.payload, headers=message.headers)
@classmethod
def with_payload(cls, payload: bytes) -> "MessageBuilder":
return cls(payload)
def with_header(self, name: str, value: str) -> "MessageBuilder":
self._headers[name] = value
return self
def with_extra_headers(
self, prefix: str, headers: Dict[str, str]
) -> "MessageBuilder":
for key, value in headers.items():
self._headers[prefix + key] = value
return self
def build(self) -> IMessage:
return Message(self._body, self._headers)