in saga/saga_state.go [271:416]
func updateSagaState(state *SagaState, msg SagaMessage) error {
if msg.SagaId != state.sagaId {
return NewInvalidSagaMessageError(fmt.Sprintf("sagaId %s & SagaMessage sagaId %s do not match", state.sagaId, msg.SagaId))
}
switch msg.MsgType {
case StartSaga:
return NewInvalidSagaStateError("Cannot apply a StartSaga Message to an already existing Saga")
case EndSaga:
//A Successfully Completed Saga must have StartTask/EndTask pairs for all messages or
//an aborted Saga must have StartTask/StartCompTask/EndCompTask pairs for all messages
for taskId := range state.taskState {
if state.sagaAborted {
if !(state.IsCompTaskStarted(taskId) && state.IsCompTaskCompleted(taskId)) {
return NewInvalidSagaStateError(fmt.Sprintf("End Saga Message cannot be applied to an aborted Saga where Task %s has not completed its compensating Tasks", taskId))
}
} else {
if !state.IsTaskCompleted(taskId) {
return NewInvalidSagaStateError(fmt.Sprintf("End Saga Message cannot be applied to a Saga where Task %s has not completed", taskId))
}
}
}
state.sagaCompleted = true
case AbortSaga:
if state.IsSagaCompleted() {
return NewInvalidSagaStateError("AbortSaga Message cannot be applied to a Completed Saga")
}
state.sagaAborted = true
case StartTask:
err := validateTaskId(msg.TaskId)
if err != nil {
return err
}
if state.IsSagaCompleted() {
return NewInvalidSagaStateError("Cannot StartTask after Saga has been completed: %s", msg.TaskId)
}
if state.IsSagaAborted() {
return NewInvalidSagaStateError("Cannot StartTask after Saga has been aborted")
}
if state.IsTaskCompleted(msg.TaskId) {
return NewInvalidSagaStateError("Cannot StartTask after it has been completed: %s", msg.TaskId)
}
if msg.Data != nil {
state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
}
state.taskState[msg.TaskId] = TaskStarted
case EndTask:
err := validateTaskId(msg.TaskId)
if err != nil {
return err
}
if state.IsSagaCompleted() {
return NewInvalidSagaStateError("Cannot EndTask after Saga has been completed")
}
if state.IsSagaAborted() {
return NewInvalidSagaStateError("Cannot EndTask after an Abort Saga Message")
}
// All EndTask Messages must have a preceding StartTask Message
if !state.IsTaskStarted(msg.TaskId) {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndTask Message Before a StartTask Message, taskId: %s", msg.TaskId))
}
state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | TaskCompleted
if msg.Data != nil {
state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
}
case StartCompTask:
err := validateTaskId(msg.TaskId)
if err != nil {
return err
}
if state.IsSagaCompleted() {
return NewInvalidSagaStateError("Cannot StartCompTask after Saga has been completed")
}
//In order to apply compensating transactions a saga must first be aborted
if !state.IsSagaAborted() {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message when Saga has not been Aborted, taskId: %s", msg.TaskId))
}
// All StartCompTask Messages must have a preceding StartTask Message
if !state.IsTaskStarted(msg.TaskId) {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message Before a StartTask Message, taskId: %s", msg.TaskId))
}
if state.IsCompTaskCompleted(msg.TaskId) {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot StartCompTask after it has been completed, taskId: %s", msg.TaskId))
}
state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | CompTaskStarted
if msg.Data != nil {
state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
}
case EndCompTask:
err := validateTaskId(msg.TaskId)
if err != nil {
return err
}
if state.IsSagaCompleted() {
return NewInvalidSagaStateError("Cannot EndCompTask after Saga has been completed")
}
//in order to apply compensating transactions a saga must first be aborted
if !state.IsSagaAborted() {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndCompTask Message when Saga has not been Aborted, taskId: %s", msg.TaskId))
}
// All EndCompTask Messages must have a preceding StartTask Message
if !state.IsTaskStarted(msg.TaskId) {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message Before a StartTask Message, taskId: %s", msg.TaskId))
}
// All EndCompTask Messages must have a preceding StartCompTask Message
if !state.IsCompTaskStarted(msg.TaskId) {
return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndCompTask Message Before a StartCompTaks Message, taskId: %s", msg.TaskId))
}
if msg.Data != nil {
state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
}
state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | CompTaskCompleted
}
return nil
}