saga/saga.go (185 lines of code) (raw):
// Package saga provides a generic implementation of the Saga pattern.
// Scoot uses the Saga pattern to track the state of long-lived Jobs
// for reporting and failure recovery.
// For info on the Saga pattern, see:
// https://speakerdeck.com/caitiem20/applying-the-saga-pattern
package saga
import (
"fmt"
"sync"
"github.com/twitter/scoot/common"
"github.com/twitter/scoot/common/stats"
)
// Concurrent Object Representing a Saga
// Methods update the state of the saga or
// Provide access to the Current State
type Saga struct {
id string
log SagaLog
state *SagaState
updateCh chan sagaUpdate
mutex sync.RWMutex // mutex controls access to Saga.state
chMutex sync.RWMutex // controls send and close of channel
closed bool
stat stats.StatsReceiver
}
// Start a New Saga. Logs a Start Saga Message to the SagaLog
// returns a Saga, or an error if one occurs
func newSaga(sagaId string, job []byte, log SagaLog, stat stats.StatsReceiver) (*Saga, error) {
state, err := makeSagaState(sagaId, job)
if err != nil {
return nil, err
}
err = log.StartSaga(sagaId, job)
if err != nil {
return nil, err
}
updateCh := make(chan sagaUpdate, common.DefaultSagaUpdateChSize)
s := &Saga{
id: sagaId,
log: log,
state: state,
updateCh: updateCh,
mutex: sync.RWMutex{},
chMutex: sync.RWMutex{},
stat: stat,
}
go s.updateSagaStateLoop()
return s, nil
}
// Rehydrate a saga from a specified SagaState, does not write
// to SagaLog assumes that this is a recovered saga.
func rehydrateSaga(sagaId string, state *SagaState, log SagaLog, stat stats.StatsReceiver) *Saga {
updateCh := make(chan sagaUpdate, common.DefaultSagaUpdateChSize)
s := &Saga{
id: sagaId,
log: log,
state: state,
updateCh: updateCh,
mutex: sync.RWMutex{},
chMutex: sync.RWMutex{},
stat: stat,
}
if !state.IsSagaCompleted() {
go s.updateSagaStateLoop()
}
return s
}
func (s *Saga) ID() string {
return s.id
}
// Returns the Current Saga State
func (s *Saga) GetState() *SagaState {
s.mutex.RLock()
defer s.mutex.RUnlock()
return copySagaState(s.state)
}
// Log an End Saga Message to the log, returns updated SagaState
// Returns the resulting SagaState or an error if it fails
//
// Once EndSaga is successfully called, trying to log additional
// messages will result in a panic.
func (s *Saga) EndSaga() error {
return s.updateSagaState([]SagaMessage{MakeEndSagaMessage(s.id)})
}
// Log an AbortSaga message. This indicates that the
// Saga has failed and all execution should be stopped
// and compensating transactions should be applied.
//
// Returns an error if it fails
//
func (s *Saga) AbortSaga() error {
return s.updateSagaState([]SagaMessage{MakeAbortSagaMessage(s.id)})
}
// Log a StartTask Message to the log. Returns
// an error if it fails.
//
// StartTask is idempotent with respect to sagaId & taskId. If
// the data passed changes the last written StartTask message will win
//
// Returns an error if it fails
//
func (s *Saga) StartTask(taskId string, data []byte) error {
defer s.stat.Latency(stats.SagaStartOrEndTaskLatency_ms).Time().Stop()
return s.updateSagaState([]SagaMessage{MakeStartTaskMessage(s.id, taskId, data)})
}
// Log an EndTask Message to the log. Indicates that this task
// has been successfully completed. Returns an error if it fails.
//
// EndTask is idempotent with respect to sagaId & taskId. If
// the data passed changes the last written EndTask message will win
//
// Returns an error if it fails
//
func (s *Saga) EndTask(taskId string, results []byte) error {
defer s.stat.Latency(stats.SagaStartOrEndTaskLatency_ms).Time().Stop()
return s.updateSagaState([]SagaMessage{MakeEndTaskMessage(s.id, taskId, results)})
}
// Log a Start Compensating Task Message to the log. Should only be logged after a Saga
// has been avoided and in Rollback Recovery Mode. Should not be used in ForwardRecovery Mode
// returns an error if it fails
//
// StartCompTask is idempotent with respect to sagaId & taskId. If
// the data passed changes the last written StartCompTask message will win
//
// Returns an error if it fails
//
func (s *Saga) StartCompensatingTask(taskId string, data []byte) error {
return s.updateSagaState([]SagaMessage{MakeStartCompTaskMessage(s.id, taskId, data)})
}
// Log an End Compensating Task Message to the log when a Compensating Task
// has been successfully completed. Returns an error if it fails.
//
// EndCompTask is idempotent with respect to sagaId & taskId. If
// the data passed changes the last written EndCompTask message will win
//
// Returns an error if it fails
//
func (s *Saga) EndCompensatingTask(taskId string, results []byte) error {
return s.updateSagaState([]SagaMessage{MakeEndCompTaskMessage(s.id, taskId, results)})
}
// BulkMessage takes a slice of SagaMessages to be applied.
// The messages update the saga state and log in the order given.
// The update is done "atomically", within a single
// Saga mutex lock. Note that the underlying log update will be
// SagaLog implementation dependent.
func (s *Saga) BulkMessage(messages []SagaMessage) error {
return s.updateSagaState(messages)
}
// adds a message for updateSagaStateLoop to execute to the channel for the
// specified saga. blocks until the message has been applied
func (s *Saga) updateSagaState(msgs []SagaMessage) error {
resultCh := make(chan error, 0)
s.chMutex.RLock()
if s.closed {
return fmt.Errorf("Trying to update saga %s that has closed channel", s.id)
}
s.updateCh <- sagaUpdate{
msgs: msgs,
resultCh: resultCh,
}
s.chMutex.RUnlock()
result := <-resultCh
// after we successfully log an EndSaga message close the channel
// no more messages should be logged
for _, msg := range msgs {
if msg.MsgType == EndSaga {
s.chMutex.Lock()
close(s.updateCh)
s.closed = true
s.chMutex.Unlock()
break
}
}
return result
}
// updateSagaStateLoop that is executed inside of a single go routine. There
// is one per saga currently executing. This ensures all updates are applied
// in order to a saga. Also controls access to the SagaState so its is
// updated in a thread safe manner
// Processes at most SagaUpdateChSize number of updates at a time
func (s *Saga) updateSagaStateLoop() {
var loopLatency = s.stat.Latency(stats.SagaUpdateStateLoopLatency_ms).Time()
updates := []sagaUpdate{}
for update := range s.updateCh {
updates = append(updates, update)
if len(s.updateCh) == 0 || len(updates) == common.DefaultSagaUpdateChSize {
loopLatency.Stop()
s.stat.Histogram(stats.SagaNumUpdatesProcessed).Update(int64(len(updates)))
s.updateSaga(updates)
updates = []sagaUpdate{}
loopLatency.Time()
}
}
}
// updateSaga updates the saga s by applying updates atomically and sending any error to the requester
// it checks if the message(s) is a valid transition and batch logs the messages from all updates durably to the SagaLog
// if msg is an invalid transition, it will neither log nor update internal state
// always returns the new SagaState that should be used, either the mutated one or a copy of the original.
func (s *Saga) updateSaga(updates []sagaUpdate) {
defer s.stat.Latency(stats.SagaUpdateStateLatency_ms).Time().Stop()
s.mutex.Lock()
defer s.mutex.Unlock()
// updateSaga will mutate state if it's a valid transition, but if we then error logging,
// we'll need to revert to old state
oldState := copySagaState(s.state)
var err error
var validUpdates []sagaUpdate
var validUpdateMsgs []SagaMessage
for _, update := range updates {
err = nil
// verify that the applied message(s) results in a valid state
if len(update.msgs) == 1 {
err = updateSagaState(s.state, update.msgs[0])
} else {
s.state, err = bulkUpdateSagaState(s.state, update.msgs)
}
// don't log messages for invalid updates
if err != nil {
update.resultCh <- err
continue
}
validUpdates = append(validUpdates, update)
validUpdateMsgs = append(validUpdateMsgs, update.msgs...)
}
// batch log valid messages(if present) from all updates
if len(validUpdateMsgs) == 0 {
return
}
err = logMessages(validUpdateMsgs, s.log)
if err != nil {
s.state = oldState
}
// forward result to all the update result channels
// all updates with encounter the same error/success while logging
for _, update := range validUpdates {
update.resultCh <- err
}
}
type sagaUpdate struct {
msgs []SagaMessage
resultCh chan error
}
// logMessages durably stores the messages in sagalog
// if the message(s) is invalid, returns an error and an unmodified saga state
func logMessages(msgs []SagaMessage, log SagaLog) error {
var err error
if len(msgs) == 1 {
err = log.LogMessage(msgs[0])
} else {
err = log.LogBatchMessages(msgs)
}
return err
}
// Checks the error returned by updating saga state.
// Returns true if the error is a FatalErr.
// Returns false if the error is transient and a retry might succeed
func FatalErr(err error) bool {
switch err.(type) {
// InvalidSagaState is an unrecoverable error. This indicates a fatal bug in the code
// which is asking for an impossible transition.
case InvalidSagaStateError:
return true
// InvalidSagaMessage is an unrecoverable error. This indicates a fatal bug in the code
// which is applying invalid parameters to a saga.
case InvalidSagaMessageError:
return true
// InvalidRequestError is an unrecoverable error. This indicates a fatal bug in the code
// where the SagaLog cannot durably store messages
case InvalidRequestError:
return true
// InternalLogError is a transient error experienced by the log. It was not
// able to durably store the message but it is ok to retry.
case InternalLogError:
return false
// SagaLog is in a bad state, this indicates a fatal bug and no more progress
// can be made on this saga.
case CorruptedSagaLogError:
return true
// unknown error, default to retryable.
default:
return true
}
}