runner/runners/status_manager.go (160 lines of code) (raw):
package runners
import (
"fmt"
"strconv"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/twitter/scoot/runner"
)
const UnknownRunIDMsg = "unknown run id %v"
// NewStatusManager creates a new empty StatusManager
func NewStatusManager(capacity int) *StatusManager {
return &StatusManager{runs: make(map[runner.RunID]runner.RunStatus), fifo: make([]runner.RunID, 0), capacity: capacity}
}
// StatusManager is a database of RunStatus'es. It allows clients to Write StatusManager, Query the
// current status, and listen for updates to status. It implements runner.RunStatus
type StatusManager struct {
mu sync.RWMutex
runs map[runner.RunID]runner.RunStatus
fifo []runner.RunID
capacity int
svcStatus runner.ServiceStatus
nextRunID int64
listeners []queryAndCh
}
type queryAndCh struct {
q runner.Query
ch chan runner.RunStatus
}
// Writer interface
// NewRun creates a new RunID in state PENDING
func (s *StatusManager) NewRun() (runner.RunStatus, error) {
s.mu.Lock()
defer s.mu.Unlock()
id := runner.RunID(strconv.FormatInt(s.nextRunID, 10))
s.nextRunID++
st := runner.RunStatus{
RunID: id,
State: runner.PENDING,
}
s.runs[id] = st
s.fifo = append(s.fifo, id)
if s.capacity != 0 && len(s.fifo) > s.capacity {
delete(s.runs, s.fifo[0])
s.fifo = s.fifo[1:]
}
return st, nil
}
// Update the overall service status independent of run status.
func (s *StatusManager) UpdateService(svcStatus runner.ServiceStatus) error {
s.mu.Lock()
defer s.mu.Unlock()
log.WithFields(
log.Fields{
"svcStatus": svcStatus,
}).Info("StatusManager updating svc")
s.svcStatus = svcStatus
return nil
}
// Update writes a new status for a run.
// It enforces several rules:
// cannot change a status once it is Done
// cannot erase Stdout/Stderr Refs
func (s *StatusManager) Update(newStatus runner.RunStatus) error {
s.mu.Lock()
defer s.mu.Unlock()
oldStatus, ok := s.runs[newStatus.RunID]
if !ok {
return fmt.Errorf("Cannot update non-existing runID: %s", newStatus.RunID)
}
if ok && oldStatus.State.IsDone() {
return nil
}
if newStatus.StdoutRef == "" {
newStatus.StdoutRef = oldStatus.StdoutRef
}
if newStatus.StderrRef == "" {
newStatus.StderrRef = oldStatus.StderrRef
}
log.WithFields(
log.Fields{
"jobID": newStatus.JobID,
"taskID": newStatus.TaskID,
"runID": newStatus.RunID,
"status": newStatus.State,
"tag": newStatus.Tag,
}).Info("StatusManager is holding status")
s.runs[newStatus.RunID] = newStatus
listeners := make([]queryAndCh, 0, len(s.listeners))
for _, listener := range s.listeners {
if listener.q.Matches(newStatus) {
log.WithFields(
log.Fields{
"status": newStatus,
}).Debug("StatusManager is putting status on listener channel")
listener.ch <- newStatus
close(listener.ch)
} else {
listeners = append(listeners, listener)
}
}
s.listeners = listeners
return nil
}
// Reader interface (implements runner.StatusQuerier)
// Query returns all RunStatus'es matching q, waiting as described by w, plus the overall service status.
func (s *StatusManager) Query(q runner.Query, wait runner.Wait) (rs []runner.RunStatus, ss runner.ServiceStatus, e error) {
current, listenerCh, err := s.queryAndListen(q, wait.Timeout != 0)
if err != nil || len(current) > 0 || wait.Timeout == 0 {
s.mu.RLock()
defer s.mu.RUnlock()
return current, s.svcStatus, err
}
var timeout <-chan time.Time
if wait.Timeout > 0 {
ticker := time.NewTicker(wait.Timeout)
timeout = ticker.C
defer ticker.Stop()
}
select {
case st := <-listenerCh:
s.mu.RLock()
defer s.mu.RUnlock()
return []runner.RunStatus{st}, s.svcStatus, nil
case <-timeout:
s.mu.RLock()
defer s.mu.RUnlock()
return nil, s.svcStatus, nil
case <-wait.AbortCh:
st := runner.RunStatus{State: runner.ABORTED}
s.mu.RLock()
defer s.mu.RUnlock()
return []runner.RunStatus{st}, s.svcStatus, nil
}
}
// QueryNow returns all RunStatus'es matching q in their current state
func (s *StatusManager) QueryNow(q runner.Query) ([]runner.RunStatus, runner.ServiceStatus, error) {
return s.Query(q, runner.Wait{})
}
// Status returns the current status of id from q.
func (s *StatusManager) Status(id runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) {
return runner.StatusNow(s, id)
}
// StatusAll returns the Current status of all runs
func (s *StatusManager) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) {
return runner.StatusAll(s)
}
// queryAndListen performs a query, returning the current results and optionally a channel for
// listening for future results
// returns:
// the current results
// a channel that will hold the next result (if current is empty and err is nil)
// error
func (s *StatusManager) queryAndListen(q runner.Query, listen bool) (
current []runner.RunStatus, listenerCh chan runner.RunStatus, err error) {
s.mu.Lock()
defer s.mu.Unlock()
if q.AllRuns {
for _, st := range s.runs {
if q.States.Matches(st.State) {
current = append(current, st)
}
}
} else {
for _, runID := range q.Runs {
st, ok := s.runs[runID]
if !ok {
return nil, nil, fmt.Errorf(UnknownRunIDMsg, runID)
}
if q.States.Matches(st.State) {
current = append(current, st)
}
}
}
if len(current) > 0 || !listen {
return current, nil, err
}
ch := make(chan runner.RunStatus, 1)
s.listeners = append(s.listeners, queryAndCh{q: q, ch: ch})
return nil, ch, nil
}