saga/saga_state.go (336 lines of code) (raw):

package saga import ( "fmt" ) type InvalidSagaStateError struct { s string } func (e InvalidSagaStateError) Error() string { return e.s } func NewInvalidSagaStateError(msg string, args ...interface{}) error { return InvalidSagaStateError{ s: fmt.Sprintf(msg, args...), } } type InvalidSagaMessageError struct { s string } func (e InvalidSagaMessageError) Error() string { return e.s } func NewInvalidSagaMessageError(msg string) error { return InvalidSagaMessageError{ s: msg, } } type flag byte const ( TaskStarted flag = 1 << iota TaskCompleted CompTaskStarted CompTaskCompleted ) /* * Additional data about tasks to be committed to the log * This is Opaque to SagaState, but useful data to persist for * results or debugging */ type taskData struct { taskStart []byte taskEnd []byte compTaskStart []byte compTaskEnd []byte } /* * Data Structure representation of the current state of the Saga. */ type SagaState struct { sagaId string job []byte // map of taskID to task data supplied when committing // startTask, endTask, startCompTask, endCompTask messages taskData map[string]*taskData // map of taskId to Flag specifying task progress taskState map[string]flag //bool if AbortSaga message logged sagaAborted bool //bool if EndSaga message logged sagaCompleted bool } /* * Initialize a Default Empty Saga */ func initializeSagaState() *SagaState { return &SagaState{ sagaId: "", job: nil, taskState: make(map[string]flag), taskData: make(map[string]*taskData), sagaAborted: false, sagaCompleted: false, } } /* * Returns the Id of the Saga this state represents */ func (state *SagaState) SagaId() string { return state.sagaId } /* * Returns the Job associated with this Saga */ func (state *SagaState) Job() []byte { return state.job } /* * Returns a lists of task ids associated with this Saga */ func (state *SagaState) GetTaskIds() []string { taskIds := make([]string, 0, len(state.taskState)) for id := range state.taskState { taskIds = append(taskIds, id) } return taskIds } /* * Returns true if the specified Task has been started, * fasle otherwise */ func (state *SagaState) IsTaskStarted(taskId string) bool { flags, _ := state.taskState[taskId] return flags&TaskStarted != 0 } /* * Get Data Associated with Start Task, supplied as * Part of the StartTask Message */ func (state *SagaState) GetStartTaskData(taskId string) []byte { data, ok := state.taskData[taskId] if ok { return data.taskStart } else { return nil } } /* * Returns true if the specified Task has been completed, * fasle otherwise */ func (state *SagaState) IsTaskCompleted(taskId string) bool { flags, _ := state.taskState[taskId] return flags&TaskCompleted != 0 } /* * Get Data Associated with End Task, supplied as * Part of the EndTask Message */ func (state *SagaState) GetEndTaskData(taskId string) []byte { data, ok := state.taskData[taskId] if ok { return data.taskEnd } else { return nil } } /* * Returns true if the specified Compensating Task has been started, * fasle otherwise */ func (state *SagaState) IsCompTaskStarted(taskId string) bool { flags, _ := state.taskState[taskId] return flags&CompTaskStarted != 0 } /* * Get Data Associated with Starting Comp Task, supplied as * Part of the StartCompTask Message */ func (state *SagaState) GetStartCompTaskData(taskId string) []byte { data, ok := state.taskData[taskId] if ok { return data.compTaskStart } else { return nil } } /* * Returns true if the specified Compensating Task has been completed, * fasle otherwise */ func (state *SagaState) IsCompTaskCompleted(taskId string) bool { flags, _ := state.taskState[taskId] return flags&CompTaskCompleted != 0 } /* * Get Data Associated with End Comp Task, supplied as * Part of the EndCompTask Message */ func (state *SagaState) GetEndCompTaskData(taskId string) []byte { data, ok := state.taskData[taskId] if ok { return data.compTaskEnd } else { return nil } } /* * Returns true if this Saga has been Aborted, false otherwise */ func (state *SagaState) IsSagaAborted() bool { return state.sagaAborted } /* * Returns true if this Saga has been Completed, false otherwise */ func (state *SagaState) IsSagaCompleted() bool { return state.sagaCompleted } /* * Add the data for the specified message type to the task metadata fields. * This Data is stored in the SagaState and persisted to durable saga log, so it * can be recovered. It is opaque to sagas but useful to persist for applications. */ func (state *SagaState) addTaskData(taskId string, msgType SagaMessageType, data []byte) { tData, ok := state.taskData[taskId] if !ok { tData = &taskData{} state.taskData[taskId] = tData } switch msgType { case StartTask: state.taskData[taskId].taskStart = data case EndTask: state.taskData[taskId].taskEnd = data case StartCompTask: state.taskData[taskId].compTaskStart = data case EndCompTask: state.taskData[taskId].compTaskEnd = data } } /* * Applies the supplied messages atomically to the supplied sagaState. * Returns the mutated sate if all the messages are valid, * otherwise returns the original state if any of the messages are invalid along with the error */ func bulkUpdateSagaState(state *SagaState, msgs []SagaMessage) (*SagaState, error) { originalState := copySagaState(state) var err error for _, msg := range msgs { err = updateSagaState(state, msg) if err != nil { return originalState, err } } return state, nil } /* * Applies the supplied message to the supplied sagaState. * Mutates state directly. * * Returns an Error if applying the message would result in an invalid Saga State. * Client must not use state after updateSagaState returns a non-nil error. */ func updateSagaState(state *SagaState, msg SagaMessage) error { if msg.SagaId != state.sagaId { return NewInvalidSagaMessageError(fmt.Sprintf("sagaId %s & SagaMessage sagaId %s do not match", state.sagaId, msg.SagaId)) } switch msg.MsgType { case StartSaga: return NewInvalidSagaStateError("Cannot apply a StartSaga Message to an already existing Saga") case EndSaga: //A Successfully Completed Saga must have StartTask/EndTask pairs for all messages or //an aborted Saga must have StartTask/StartCompTask/EndCompTask pairs for all messages for taskId := range state.taskState { if state.sagaAborted { if !(state.IsCompTaskStarted(taskId) && state.IsCompTaskCompleted(taskId)) { return NewInvalidSagaStateError(fmt.Sprintf("End Saga Message cannot be applied to an aborted Saga where Task %s has not completed its compensating Tasks", taskId)) } } else { if !state.IsTaskCompleted(taskId) { return NewInvalidSagaStateError(fmt.Sprintf("End Saga Message cannot be applied to a Saga where Task %s has not completed", taskId)) } } } state.sagaCompleted = true case AbortSaga: if state.IsSagaCompleted() { return NewInvalidSagaStateError("AbortSaga Message cannot be applied to a Completed Saga") } state.sagaAborted = true case StartTask: err := validateTaskId(msg.TaskId) if err != nil { return err } if state.IsSagaCompleted() { return NewInvalidSagaStateError("Cannot StartTask after Saga has been completed: %s", msg.TaskId) } if state.IsSagaAborted() { return NewInvalidSagaStateError("Cannot StartTask after Saga has been aborted") } if state.IsTaskCompleted(msg.TaskId) { return NewInvalidSagaStateError("Cannot StartTask after it has been completed: %s", msg.TaskId) } if msg.Data != nil { state.addTaskData(msg.TaskId, msg.MsgType, msg.Data) } state.taskState[msg.TaskId] = TaskStarted case EndTask: err := validateTaskId(msg.TaskId) if err != nil { return err } if state.IsSagaCompleted() { return NewInvalidSagaStateError("Cannot EndTask after Saga has been completed") } if state.IsSagaAborted() { return NewInvalidSagaStateError("Cannot EndTask after an Abort Saga Message") } // All EndTask Messages must have a preceding StartTask Message if !state.IsTaskStarted(msg.TaskId) { return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndTask Message Before a StartTask Message, taskId: %s", msg.TaskId)) } state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | TaskCompleted if msg.Data != nil { state.addTaskData(msg.TaskId, msg.MsgType, msg.Data) } case StartCompTask: err := validateTaskId(msg.TaskId) if err != nil { return err } if state.IsSagaCompleted() { return NewInvalidSagaStateError("Cannot StartCompTask after Saga has been completed") } //In order to apply compensating transactions a saga must first be aborted if !state.IsSagaAborted() { return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message when Saga has not been Aborted, taskId: %s", msg.TaskId)) } // All StartCompTask Messages must have a preceding StartTask Message if !state.IsTaskStarted(msg.TaskId) { return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message Before a StartTask Message, taskId: %s", msg.TaskId)) } if state.IsCompTaskCompleted(msg.TaskId) { return NewInvalidSagaStateError(fmt.Sprintf("Cannot StartCompTask after it has been completed, taskId: %s", msg.TaskId)) } state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | CompTaskStarted if msg.Data != nil { state.addTaskData(msg.TaskId, msg.MsgType, msg.Data) } case EndCompTask: err := validateTaskId(msg.TaskId) if err != nil { return err } if state.IsSagaCompleted() { return NewInvalidSagaStateError("Cannot EndCompTask after Saga has been completed") } //in order to apply compensating transactions a saga must first be aborted if !state.IsSagaAborted() { return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndCompTask Message when Saga has not been Aborted, taskId: %s", msg.TaskId)) } // All EndCompTask Messages must have a preceding StartTask Message if !state.IsTaskStarted(msg.TaskId) { return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message Before a StartTask Message, taskId: %s", msg.TaskId)) } // All EndCompTask Messages must have a preceding StartCompTask Message if !state.IsCompTaskStarted(msg.TaskId) { return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndCompTask Message Before a StartCompTaks Message, taskId: %s", msg.TaskId)) } if msg.Data != nil { state.addTaskData(msg.TaskId, msg.MsgType, msg.Data) } state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | CompTaskCompleted } return nil } /* * Creates a copy of mutable saga state. Does not copy * binary data, only pointers to it. */ func copySagaState(s *SagaState) *SagaState { newS := &SagaState{ sagaId: s.sagaId, sagaAborted: s.sagaAborted, sagaCompleted: s.sagaCompleted, } newS.taskState = make(map[string]flag) for key, value := range s.taskState { newS.taskState[key] = value } newS.taskData = make(map[string]*taskData) for key, value := range s.taskData { newS.taskData[key] = &taskData{ taskStart: value.taskStart, taskEnd: value.taskEnd, compTaskStart: value.compTaskStart, compTaskEnd: value.compTaskEnd, } } newS.job = s.job return newS } /* * Validates that a SagaId Is valid. Returns error if valid, nil otherwise */ func validateSagaId(sagaId string) error { if sagaId == "" { return NewInvalidSagaMessageError("sagaId cannot be the empty string") } else { return nil } } /* * Validates that a TaskId Is valid. Returns error if valid, nil otherwise */ func validateTaskId(taskId string) error { if taskId == "" { return NewInvalidSagaMessageError("taskId cannot be the empty string") } else { return nil } } /* * Initialize a SagaState for the specified saga, and default data. */ func makeSagaState(sagaId string, job []byte) (*SagaState, error) { state := initializeSagaState() err := validateSagaId(sagaId) if err != nil { return nil, err } state.sagaId = sagaId state.job = job return state, nil } /* * Custom ToString function for SagaState */ func (state *SagaState) String() string { fmtString := "{ SagaId: %v, " + "SagaAborted: %v, " + "SagaCompleted: %v, " + "Tasks: [ " for _, id := range state.GetTaskIds() { taskState := "" if state.IsTaskStarted(id) { taskState += "Started|" } if state.IsTaskCompleted(id) { taskState += "Completed|" } if state.IsCompTaskStarted(id) { taskState += "CompTaskStarted|" } if state.IsCompTaskCompleted(id) { taskState += "CompTaskCompleted|" } // remove trailing slash if len(taskState) >= 1 { taskState = taskState[0 : len(taskState)-1] } fmtString += fmt.Sprintf("%v: %s, ", id, taskState) } fmtString += "]" return fmt.Sprintf( fmtString, state.sagaId, state.IsSagaAborted(), state.IsSagaCompleted(), ) }