message_flow/commands/consumer/command_handler_params.py (23 lines of code) (raw):
from typing import Dict, Optional
from message_flow.commands.common import CommandMessageHeaders, ReplyMessageHeaders
from message_flow.messaging.common import IMessage
class CommandHandlerParams:
def __init__(self, message: IMessage) -> None:
self._correlation_headers = self._get_correlation_headers(message.headers)
self._default_reply_channel = message.get_header(CommandMessageHeaders.REPLY_TO)
@property
def correlation_headers(self) -> Dict[str, str]:
return self._correlation_headers
@property
def default_reply_channel(self) -> Optional[str]:
return self._default_reply_channel
def _get_correlation_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
correlation_headers: Dict[str, str] = {
CommandMessageHeaders.in_reply(key): value
for key, value in headers.items()
if key.startswith(CommandMessageHeaders.COMMAND_HEADER_PREFIX)
}
correlation_headers[ReplyMessageHeaders.IN_REPLY_TO] = headers.get(
IMessage.ID, ""
)
return correlation_headers