saga/sagalogs/memory.go (127 lines of code) (raw):
// Package sagalogs provides implementations of SagaLog.
package sagalogs
import (
"errors"
"fmt"
"sync"
"time"
"github.com/twitter/scoot/common/stats"
"github.com/twitter/scoot/saga"
log "github.com/sirupsen/logrus"
)
// In Memory Implementation of a SagaLog, DOES NOT durably persist messages.
type inMemorySagaLog struct {
sagas map[string]*logData
mutex sync.RWMutex
gcExpiration time.Duration
gcTicker *time.Ticker
}
// wrapper for SagaMessages tracking timestamps for GC
type logData struct {
messages []saga.SagaMessage
created time.Time
}
// Returns an Instance of a Saga based on an inMemorySagaLog.
// This is an in-memory impl, and is not durable.
// Implements GC of whole sagas based on time expiration regardless of Saga state.
// GC should be set at a duration that realistically will not purge active Sagas.
// gcExpiration: duration after which a Saga was created, it will be deleted.
// A zero duration is interpretted as "never gc" (the Log will eventually consume all memory).
// gcInterval: duration interval at which GC runs.
func MakeInMemorySagaCoordinator(gcExpiration time.Duration, gcInterval time.Duration, stat stats.StatsReceiver) saga.SagaCoordinator {
return saga.MakeSagaCoordinator(MakeInMemorySagaLog(gcExpiration, gcInterval), stat)
}
// Make an InMemorySagaLog with specified GC expiration and interval duration.
func MakeInMemorySagaLog(gcExpiration time.Duration, gcInterval time.Duration) saga.SagaLog {
slog := &inMemorySagaLog{
sagas: make(map[string]*logData),
mutex: sync.RWMutex{},
gcExpiration: gcExpiration,
}
if gcExpiration != 0 {
slog.gcTicker = time.NewTicker(gcInterval)
go func() {
for range slog.gcTicker.C {
if err := slog.gcSagas(); err != nil {
log.Errorf("Error running gcSagas: %s", err)
}
}
}()
}
return slog
}
// Shorthand creator function to create a non-GCing SagaLog with Coordinator
func MakeInMemorySagaCoordinatorNoGC(stat stats.StatsReceiver) saga.SagaCoordinator {
return saga.MakeSagaCoordinator(MakeInMemorySagaLogNoGC(), stat)
}
// Shorthand creator function to create a non-GCing SagaLog
func MakeInMemorySagaLogNoGC() saga.SagaLog {
return MakeInMemorySagaLog(0, 0)
}
// Creates a Saga in the log and adds a StartSagaMessage to it
func (slog *inMemorySagaLog) StartSaga(sagaId string, job []byte) error {
slog.mutex.Lock()
defer slog.mutex.Unlock()
startMsg := saga.MakeStartSagaMessage(sagaId, job)
slog.sagas[sagaId] = &logData{messages: []saga.SagaMessage{startMsg}, created: time.Now()}
return nil
}
// Log a SagaMessage to an existing Saga in the log
func (slog *inMemorySagaLog) LogMessage(msg saga.SagaMessage) error {
return slog.logMessages([]saga.SagaMessage{msg})
}
// Log a batch of messages in one transaction. Assumes messages are for the same saga.
func (slog *inMemorySagaLog) LogBatchMessages(msgs []saga.SagaMessage) error {
return slog.logMessages(msgs)
}
func (slog *inMemorySagaLog) logMessages(msgs []saga.SagaMessage) error {
if len(msgs) == 0 {
return errors.New("Empty messages slice passed to logMessages")
}
slog.mutex.Lock()
defer slog.mutex.Unlock()
sagaId := msgs[0].SagaId
ld, ok := slog.sagas[sagaId]
if !ok {
return errors.New(fmt.Sprintf("Saga: %s does not exist in the Log", sagaId))
}
ld.messages = append(ld.messages, msgs...)
return nil
}
// Gets all SagaMessages from an existing Saga in the log
func (slog *inMemorySagaLog) GetMessages(sagaId string) ([]saga.SagaMessage, error) {
slog.mutex.RLock()
defer slog.mutex.RUnlock()
ld, ok := slog.sagas[sagaId]
if ok {
return ld.messages, nil
} else {
return nil, nil
}
}
// Returns all non-GCd SagaIds existing since this SagaLog was created.
// Includes Sagas of any state (completed, active, etc).
func (slog *inMemorySagaLog) GetActiveSagas() ([]string, error) {
slog.mutex.RLock()
defer slog.mutex.RUnlock()
keys := make([]string, len(slog.sagas))
i := 0
for key := range slog.sagas {
keys[i] = key
i++
}
return keys, nil
}
// Check for expired Sagas and then delete them.
// Sagas need not be completed to be GCd.
func (slog *inMemorySagaLog) gcSagas() error {
expired := slog.getExpiredSagaIds()
if len(expired) == 0 {
return nil
}
slog.mutex.Lock()
defer slog.mutex.Unlock()
for _, id := range expired {
delete(slog.sagas, id)
}
return nil
}
func (slog *inMemorySagaLog) getExpiredSagaIds() []string {
slog.mutex.RLock()
defer slog.mutex.RUnlock()
expired := []string{}
for id, ld := range slog.sagas {
if time.Since(ld.created) >= slog.gcExpiration {
expired = append(expired, id)
}
}
return expired
}
// Private utility function for testing only
func (slog *inMemorySagaLog) setSagaCreatedTime(sagaId string, created time.Time) {
slog.mutex.Lock()
defer slog.mutex.Unlock()
ld, ok := slog.sagas[sagaId]
if !ok {
return
}
ld.created = created
}