saga/sagalogs/file.go (257 lines of code) (raw):
package sagalogs
import (
"bufio"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"time"
"github.com/twitter/scoot/saga"
)
// Writes Saga Log to file system. Not durable beyond machine failure
// Sagas are stored in a directory. Each saga has a corresponding directory
// each saga directory contains a log file, and associated data files.
// StartSaga Message
// StartSaga \n
// job data filename \n
// StartTask Message
// StartTask \n
// taskId \n
// taskData filename \n
// EndTask Message
// EndTask \n
// taskId \n
// taskData filename \n
// AbortSaga Message
// AbortSaga \n
// StartCompTask Message
// StartCompTask \n
// taskId \n
// taskData filename \n
// EndCompTask Message
// EndComptTask \n
// taskId \n
// taskData filename \n
// EndSaga Message
// EndSaga
type fileSagaLog struct {
dirName string
}
// Creates a FileSagaLog with files stored at the specified directory
// If the directory does not exist it will create it.
func MakeFileSagaLog(dirName string) (*fileSagaLog, error) {
if err := os.MkdirAll(dirName, os.ModePerm); err != nil {
return nil, err
}
return &fileSagaLog{
dirName: dirName,
}, nil
}
// all files for a saga log are stored in a directory named
// by the specified sagaId.
func (log *fileSagaLog) getSagaDirectory(sagaId string) string {
return path.Join(log.dirName, sagaId)
}
// Returns the name of the sagalog for the specified file.
func (log *fileSagaLog) getSagaLogFileName(sagaId string) string {
return path.Join(log.getSagaDirectory(sagaId), "log")
}
// Returns the name of the file to store task data in
// SagaId for the saga. Not deterministic.
// TaskId corresponding to taskdata
// SagaMessageType type of SagaMessage
func (log *fileSagaLog) createTaskDataFileName(
sagaId string,
taskId string,
msgType saga.SagaMessageType) string {
//format sagaDir/MsgType_taskId_data_timestamp
fileName := fmt.Sprintf("%v_%v_data_%v",
msgType.String(),
taskId,
time.Now().Format(time.StampMilli),
)
return path.Join(log.getSagaDirectory(sagaId), fileName)
}
// Returns the name of the file to store Job Data in based on the
// SagaId. Not deterministic.
func (log *fileSagaLog) createJobDataFileName(sagaId string) string {
//format sagaDir/StartSagaData_timestamp
fileName := fmt.Sprintf(
"StartSagaData_%v", time.Now().Format(time.StampNano))
return path.Join(log.getSagaDirectory(sagaId), fileName)
}
// Log a Start Saga Message message to the log.
// Returns an error if it fails.
func (log *fileSagaLog) StartSaga(sagaId string, job []byte) error {
// Create directory for this saga if it doesn't exist
dirName := log.getSagaDirectory(sagaId)
if _, err := os.Stat(dirName); err != nil {
if os.IsNotExist(err) {
err = os.Mkdir(dirName, os.ModePerm)
if err != nil {
return err
}
} else {
return err
}
}
// Write Data File
dataFileName := log.createJobDataFileName(sagaId)
err := ioutil.WriteFile(dataFileName, job, os.ModePerm)
if err != nil {
return err
}
// Write Message to Log File
var logFile *os.File
defer logFile.Close()
logFileName := log.getSagaLogFileName(sagaId)
// Append StartSaga message to the log
// Get File Handle for Saga Create it if it doesn't exist
if _, err = os.Stat(logFileName); err != nil {
if os.IsNotExist(err) {
logFile, err = os.Create(logFileName)
if err != nil {
return err
}
} else {
return err
}
} else {
logFile, err = os.OpenFile(logFileName, os.O_APPEND|os.O_RDWR, os.ModePerm)
if err != nil {
return err
}
}
// write log message
msg := []byte(fmt.Sprintf("%v\n%v\n",
saga.StartSaga.String(),
dataFileName))
_, err = logFile.Write(msg)
if err != nil {
return err
}
logFile.Sync()
return nil
}
// Update the State of the Saga by Logging a message.
// Returns an error if it fails.
func (log *fileSagaLog) LogMessage(message saga.SagaMessage) error {
return log.logMessages([]saga.SagaMessage{message})
}
// Log a batch of messages in one transaction. Assumes messages are for the same saga.
func (log *fileSagaLog) LogBatchMessages(msgs []saga.SagaMessage) error {
return log.logMessages(msgs)
}
func (log *fileSagaLog) logMessages(msgs []saga.SagaMessage) error {
if len(msgs) == 0 {
return errors.New("Empty messages slice passed to logMessages")
}
fileName := log.getSagaLogFileName(msgs[0].SagaId)
// Get file handle for Saga if it doesn't exist return error,
// Saga wasn't started. OpenFile so we can append to it
logFile, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR, os.ModePerm)
defer logFile.Close()
if err != nil {
return err
}
for _, message := range msgs {
// Write MessageType
msg := []byte(fmt.Sprintf("%v\n", message.MsgType.String()))
// If its a Task Type Write the TaskId and Data
if message.MsgType == saga.StartTask ||
message.MsgType == saga.EndTask ||
message.MsgType == saga.StartCompTask ||
message.MsgType == saga.EndCompTask {
// write task data to file
dataFileName := log.createTaskDataFileName(
message.SagaId, message.TaskId, message.MsgType)
err = ioutil.WriteFile(dataFileName, message.Data, os.ModePerm)
if err != nil {
return err
}
// update log message
msg = append(msg, []byte(
fmt.Sprintf("%v\n%v\n",
message.TaskId,
dataFileName))...)
}
_, err = logFile.Write(msg)
if err != nil {
return err
}
}
logFile.Sync()
return nil
}
// Returns all of the messages logged so far for the
// specified saga.
func (log *fileSagaLog) GetMessages(sagaId string) ([]saga.SagaMessage, error) {
fileName := log.getSagaLogFileName(sagaId)
// check if this saga actually exists
if _, err := os.Stat(fileName); err != nil {
return nil, nil
}
logFile, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer logFile.Close()
msgs := make([]saga.SagaMessage, 0)
scanner := bufio.NewScanner(logFile)
nextToken := scanner.Scan()
for nextToken == true {
msg, err := parseMessage(sagaId, scanner)
if err != nil {
return nil, err
}
msgs = append(msgs, msg)
nextToken = scanner.Scan()
}
return msgs, nil
}
// Helper Function that Parses a SagaMessage. Returns a message if successfully parsed
// Returns and error otherwise
func parseMessage(sagaId string, scanner *bufio.Scanner) (saga.SagaMessage, error) {
switch scanner.Text() {
// Parse Start Saga Message
case saga.StartSaga.String():
if ok := scanner.Scan(); !ok {
return saga.SagaMessage{}, saga.NewCorruptedSagaLogError(
sagaId,
fmt.Sprintf("Error Parsing SagaLog expected Data after StartSaga message. Error: %v",
createUnexpectedScanEndMsg(scanner)),
)
}
dataFileName := scanner.Text()
data, err := ioutil.ReadFile(dataFileName)
if err != nil {
return saga.SagaMessage{},
saga.NewCorruptedSagaLogError(
sagaId,
fmt.Sprintf("Error Reading DataFile %v, Error: %v", dataFileName, err),
)
}
return saga.MakeStartSagaMessage(sagaId, data), nil
// Parse End Saga Message
case saga.EndSaga.String():
return saga.MakeEndSagaMessage(sagaId), nil
// Parse Abort Saga Message
case saga.AbortSaga.String():
return saga.MakeAbortSagaMessage(sagaId), nil
// Parse Start Task Message
case saga.StartTask.String():
taskId, data, err := parseTask(sagaId, scanner)
if err != nil {
return saga.SagaMessage{}, err
}
return saga.MakeStartTaskMessage(sagaId, taskId, data), nil
// Parse End Task Message
case saga.EndTask.String():
taskId, data, err := parseTask(sagaId, scanner)
if err != nil {
return saga.SagaMessage{}, err
}
return saga.MakeEndTaskMessage(sagaId, taskId, data), nil
// Parse Start Comp Task Message
case saga.StartCompTask.String():
taskId, data, err := parseTask(sagaId, scanner)
if err != nil {
return saga.SagaMessage{}, err
}
return saga.MakeStartCompTaskMessage(sagaId, taskId, data), nil
// Parse End Comp Task Message
case saga.EndCompTask.String():
taskId, data, err := parseTask(sagaId, scanner)
if err != nil {
return saga.SagaMessage{}, err
}
return saga.MakeEndCompTaskMessage(sagaId, taskId, data), nil
// Unrecognized Message
default:
return saga.SagaMessage{}, saga.NewCorruptedSagaLogError(
sagaId,
fmt.Sprintf("Error Parsing SagaLog unrecognized message type, %v", scanner.Text()),
)
}
}
// Helper function that parses a task message, StartTask, EndTask, StartCompTask,
// EndCompTasks. Message is of structure
// line1: MessageType
// line2: TaskId
// line3: TaskData FileName
// Returns a tuple of TaskId, TaskData, Error
func parseTask(sagaId string, scanner *bufio.Scanner) (string, []byte, error) {
// read taskId and datafileName
if ok := scanner.Scan(); !ok {
return "", nil,
saga.NewCorruptedSagaLogError(
sagaId,
fmt.Sprintf("Error Parsing SagaLog expected TaskId, Error: %v",
createUnexpectedScanEndMsg(scanner)),
)
}
taskId := scanner.Text()
if ok := scanner.Scan(); !ok {
return "", nil,
saga.NewCorruptedSagaLogError(
sagaId,
fmt.Sprintf("Error Parsing SagaLog expected Data, Error: %v",
createUnexpectedScanEndMsg(scanner)),
)
}
dataFileName := scanner.Text()
data, err := ioutil.ReadFile(dataFileName)
if err != nil {
return "", nil,
saga.NewCorruptedSagaLogError(
sagaId,
fmt.Sprintf("Error Reading DataFile %v, Error: %v", dataFileName, err),
)
}
return taskId, data, nil
}
func createUnexpectedScanEndMsg(scanner *bufio.Scanner) string {
var errMsg string
if scanner.Err() != nil {
errMsg = scanner.Err().Error()
} else {
errMsg = "Unexpected EOF"
}
return errMsg
}
//
// Returns a list of all in progress sagaIds.
// This MUST include all not completed sagaIds.
// It may also included completed sagas
// Returns an error if it fails.
//
// This is a very dumb implementation currently just returns
// a list of all sagas. This can be improved with an index
//
func (log *fileSagaLog) GetActiveSagas() ([]string, error) {
files, err := ioutil.ReadDir(log.dirName)
if err != nil {
return nil, err
}
sagaIds := make([]string, len(files))
for i, file := range files {
sagaIds[i] = file.Name()
}
return sagaIds, nil
}