in message_flow/sagas/orchestration/saga_manager_impl.py [0:0]
def _handle_reply(self, message: IMessage) -> None:
if not self._is_reply_for_this_saga_type(message):
return
self._logger.debug("Handle reply %s", message)
saga_id = message.get_required_header(SagaReplyHeaders.REPLY_SAGA_ID)
saga_type = message.get_required_header(SagaReplyHeaders.REPLY_SAGA_TYPE)
saga_instance = self._saga_instance_repository.find(saga_id)
saga_data: Data = SagaDataSerde.deserialize_saga_data(
saga_instance.serialized_saga_data,
self._saga_data_mapping,
)
current_state = saga_instance.state_name
self._logger.info("Current state=%s", current_state)
actions = self._state_definition.handle_reply(
saga_type, saga_id, current_state, saga_data, message
)
self._logger.info("Handled reply. Sending commands %s", actions.commands)
self._process_actions(saga_type, saga_id, saga_instance, saga_data, actions)