message_flow/sagas/orchestration/saga_command_producer.py (27 lines of code) (raw):

from typing import List from ...commands.consumer import CommandWithDestination from ...commands.producer import CommandProducer from ..common import SagaCommandHeaders # type: ignore __all__ = ["SagaCommandProducer"] class SagaCommandProducer: def __init__(self, command_producer: CommandProducer) -> None: self._command_producer = command_producer def send_commands( self, saga_type: str, saga_id: str, commands: List[CommandWithDestination], saga_reply_channel: str, ) -> str: message_id: str = "" for command in commands: headers = command.extra_headers headers[SagaCommandHeaders.SAGA_TYPE] = saga_type headers[SagaCommandHeaders.SAGA_ID] = saga_id message_id = self._command_producer.send( command.destination_channel, command.command, saga_reply_channel, headers=headers, ) return message_id