in message_flow/sagas/orchestration_simple_dsl/participant_invocation_step.py [0:0]
def is_successful_reply(self, compensating: bool, message: IMessage) -> bool:
if (pi := self._get_participant_invocation(compensating)) is not None:
return pi.is_successful_reply(message)
raise RuntimeError("No invocation for the step.")