worker/domain/api.go (174 lines of code) (raw):

package domain import ( "time" "github.com/twitter/scoot/common/errors" "github.com/twitter/scoot/common/log/helpers" "github.com/twitter/scoot/common/log/tags" "github.com/twitter/scoot/common/thrifthelpers" "github.com/twitter/scoot/runner" "github.com/twitter/scoot/worker/domain/gen-go/worker" ) // // Translation between local domain objects and thrift objects: // // TODO: test workerStatus. type WorkerStatus struct { Runs []runner.RunStatus Initialized bool IsHealthy bool Error string } func ThriftWorkerStatusToDomain(thrift *worker.WorkerStatus) WorkerStatus { runs := make([]runner.RunStatus, 0) for _, r := range thrift.Runs { runs = append(runs, ThriftRunStatusToDomain(r)) } return WorkerStatus{runs, thrift.Initialized, thrift.IsHealthy, thrift.Error} } func DomainWorkerStatusToThrift(domain WorkerStatus) *worker.WorkerStatus { thrift := worker.NewWorkerStatus() thrift.Runs = make([]*worker.RunStatus, 0) for _, r := range domain.Runs { thrift.Runs = append(thrift.Runs, DomainRunStatusToThrift(r)) thrift.Initialized = domain.Initialized thrift.IsHealthy = domain.IsHealthy thrift.Error = domain.Error } return thrift } func ThriftRunCommandToDomain(thrift *worker.RunCommand) *runner.Command { argv := make([]string, 0) env := make(map[string]string) timeout := time.Duration(0) snapshotID := "" jobID := "" taskID := "" tag := "" if thrift.Argv != nil { argv = thrift.Argv } if thrift.Env != nil { env = thrift.Env } if thrift.TimeoutMs != nil { timeout = time.Millisecond * time.Duration(*thrift.TimeoutMs) } if thrift.SnapshotId != nil { snapshotID = *thrift.SnapshotId } if thrift.TaskId != nil { taskID = *thrift.TaskId } if thrift.JobId != nil { jobID = *thrift.JobId } if thrift.Tag != nil { tag = *thrift.Tag } return &runner.Command{ Argv: argv, EnvVars: env, Timeout: timeout, SnapshotID: snapshotID, LogTags: tags.LogTags{ JobID: jobID, TaskID: taskID, Tag: tag, }, } } func DomainRunCommandToThrift(domain *runner.Command) *worker.RunCommand { thrift := worker.NewRunCommand() timeoutMs := int32(domain.Timeout / time.Millisecond) thrift.TimeoutMs = &timeoutMs thrift.Env = domain.EnvVars thrift.Argv = domain.Argv snapID := domain.SnapshotID thrift.SnapshotId = &snapID jobID := domain.JobID thrift.JobId = &jobID taskID := domain.TaskID thrift.TaskId = &taskID tag := domain.Tag thrift.Tag = &tag return thrift } func ThriftRunStatusToDomain(thrift *worker.RunStatus) runner.RunStatus { domain := runner.RunStatus{} domain.RunID = runner.RunID(thrift.RunId) switch thrift.Status { case worker.Status_UNKNOWN: domain.State = runner.UNKNOWN case worker.Status_PENDING: domain.State = runner.PENDING case worker.Status_RUNNING: domain.State = runner.RUNNING case worker.Status_COMPLETE: domain.State = runner.COMPLETE case worker.Status_FAILED: domain.State = runner.FAILED case worker.Status_ABORTED: domain.State = runner.ABORTED case worker.Status_TIMEDOUT: domain.State = runner.TIMEDOUT } if thrift.OutUri != nil { domain.StdoutRef = *thrift.OutUri } if thrift.ErrUri != nil { domain.StderrRef = *thrift.ErrUri } if thrift.Error != nil { domain.Error = *thrift.Error } if thrift.ExitCode != nil { domain.ExitCode = errors.ExitCode(*thrift.ExitCode) } if thrift.SnapshotId != nil { domain.SnapshotID = *thrift.SnapshotId } if thrift.JobId != nil { domain.JobID = *thrift.JobId } if thrift.TaskId != nil { domain.TaskID = *thrift.TaskId } if thrift.Tag != nil { domain.Tag = *thrift.Tag } return domain } func DomainRunStatusToThrift(domain runner.RunStatus) *worker.RunStatus { thrift := worker.NewRunStatus() thrift.RunId = string(domain.RunID) switch domain.State { case runner.UNKNOWN: thrift.Status = worker.Status_UNKNOWN case runner.PENDING: thrift.Status = worker.Status_PENDING case runner.RUNNING: thrift.Status = worker.Status_RUNNING case runner.COMPLETE: thrift.Status = worker.Status_COMPLETE case runner.FAILED: thrift.Status = worker.Status_FAILED case runner.ABORTED: thrift.Status = worker.Status_ABORTED case runner.TIMEDOUT: thrift.Status = worker.Status_TIMEDOUT } thrift.OutUri = helpers.CopyStringToPointer(domain.StdoutRef) thrift.ErrUri = helpers.CopyStringToPointer(domain.StderrRef) thrift.Error = helpers.CopyStringToPointer(domain.Error) exitCode := int32(domain.ExitCode) thrift.ExitCode = &exitCode thrift.SnapshotId = helpers.CopyStringToPointer(domain.SnapshotID) thrift.JobId = helpers.CopyStringToPointer(domain.JobID) thrift.TaskId = helpers.CopyStringToPointer(domain.TaskID) thrift.Tag = helpers.CopyStringToPointer(domain.Tag) return thrift } func SerializeProcessStatus(processStatus runner.RunStatus) ([]byte, error) { runStatus := DomainRunStatusToThrift(processStatus) asBytes, err := thrifthelpers.JsonSerialize(runStatus) if err != nil { return nil, err } return asBytes, err }