func updateSagaState()

in saga/saga_state.go [271:416]


func updateSagaState(state *SagaState, msg SagaMessage) error {
	if msg.SagaId != state.sagaId {
		return NewInvalidSagaMessageError(fmt.Sprintf("sagaId %s & SagaMessage sagaId %s do not match", state.sagaId, msg.SagaId))
	}

	switch msg.MsgType {
	case StartSaga:
		return NewInvalidSagaStateError("Cannot apply a StartSaga Message to an already existing Saga")

	case EndSaga:
		//A Successfully Completed Saga must have StartTask/EndTask pairs for all messages or
		//an aborted Saga must have StartTask/StartCompTask/EndCompTask pairs for all messages
		for taskId := range state.taskState {
			if state.sagaAborted {
				if !(state.IsCompTaskStarted(taskId) && state.IsCompTaskCompleted(taskId)) {
					return NewInvalidSagaStateError(fmt.Sprintf("End Saga Message cannot be applied to an aborted Saga where Task %s has not completed its compensating Tasks", taskId))
				}
			} else {
				if !state.IsTaskCompleted(taskId) {
					return NewInvalidSagaStateError(fmt.Sprintf("End Saga Message cannot be applied to a Saga where Task %s has not completed", taskId))
				}
			}
		}

		state.sagaCompleted = true

	case AbortSaga:
		if state.IsSagaCompleted() {
			return NewInvalidSagaStateError("AbortSaga Message cannot be applied to a Completed Saga")
		}

		state.sagaAborted = true

	case StartTask:
		err := validateTaskId(msg.TaskId)
		if err != nil {
			return err
		}

		if state.IsSagaCompleted() {
			return NewInvalidSagaStateError("Cannot StartTask after Saga has been completed: %s", msg.TaskId)
		}

		if state.IsSagaAborted() {
			return NewInvalidSagaStateError("Cannot StartTask after Saga has been aborted")
		}

		if state.IsTaskCompleted(msg.TaskId) {
			return NewInvalidSagaStateError("Cannot StartTask after it has been completed: %s", msg.TaskId)
		}

		if msg.Data != nil {
			state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
		}

		state.taskState[msg.TaskId] = TaskStarted

	case EndTask:
		err := validateTaskId(msg.TaskId)
		if err != nil {
			return err
		}

		if state.IsSagaCompleted() {
			return NewInvalidSagaStateError("Cannot EndTask after Saga has been completed")
		}

		if state.IsSagaAborted() {
			return NewInvalidSagaStateError("Cannot EndTask after an Abort Saga Message")
		}

		// All EndTask Messages must have a preceding StartTask Message
		if !state.IsTaskStarted(msg.TaskId) {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndTask Message Before a StartTask Message, taskId: %s", msg.TaskId))
		}

		state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | TaskCompleted

		if msg.Data != nil {
			state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
		}

	case StartCompTask:
		err := validateTaskId(msg.TaskId)
		if err != nil {
			return err
		}

		if state.IsSagaCompleted() {
			return NewInvalidSagaStateError("Cannot StartCompTask after Saga has been completed")
		}

		//In order to apply compensating transactions a saga must first be aborted
		if !state.IsSagaAborted() {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message when Saga has not been Aborted, taskId: %s", msg.TaskId))
		}

		// All StartCompTask Messages must have a preceding StartTask Message
		if !state.IsTaskStarted(msg.TaskId) {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message Before a StartTask Message, taskId: %s", msg.TaskId))
		}

		if state.IsCompTaskCompleted(msg.TaskId) {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot StartCompTask after it has been completed, taskId: %s", msg.TaskId))
		}

		state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | CompTaskStarted

		if msg.Data != nil {
			state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
		}

	case EndCompTask:
		err := validateTaskId(msg.TaskId)
		if err != nil {
			return err
		}

		if state.IsSagaCompleted() {
			return NewInvalidSagaStateError("Cannot EndCompTask after Saga has been completed")
		}

		//in order to apply compensating transactions a saga must first be aborted
		if !state.IsSagaAborted() {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndCompTask Message when Saga has not been Aborted, taskId: %s", msg.TaskId))
		}

		// All EndCompTask Messages must have a preceding StartTask Message
		if !state.IsTaskStarted(msg.TaskId) {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a StartCompTask Message Before a StartTask Message, taskId: %s", msg.TaskId))
		}

		// All EndCompTask Messages must have a preceding StartCompTask Message
		if !state.IsCompTaskStarted(msg.TaskId) {
			return NewInvalidSagaStateError(fmt.Sprintf("Cannot have a EndCompTask Message Before a StartCompTaks Message, taskId: %s", msg.TaskId))
		}

		if msg.Data != nil {
			state.addTaskData(msg.TaskId, msg.MsgType, msg.Data)
		}

		state.taskState[msg.TaskId] = state.taskState[msg.TaskId] | CompTaskCompleted
	}

	return nil
}