scheduler/server/job_state.go (229 lines of code) (raw):
package server
import (
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
log "github.com/sirupsen/logrus"
"github.com/twitter/scoot/saga"
"github.com/twitter/scoot/scheduler/domain"
)
type taskStateByTaskID map[string]*taskState
// Contains all the information for a job in progress
// Note: Only Job, Saga, and Tasks are provided during scheduler recovery. Anything else must be initialized separately.
type jobState struct {
Job *domain.Job
Saga *saga.Saga //saga associated with this job
Tasks []*taskState //ordered list of taskState
EndingSaga bool //denotes whether an EndSagaMsg is in progress or not
TasksCompleted int //number of tasks that've been marked completed so far.
TasksRunning int //number of tasks that've been scheduled or started.
stateMu sync.RWMutex
JobKilled bool //indicates the job was killed
TimeCreated time.Time //when was this job first created
TimeMarker time.Time //when was this job last marked (i.e. for reporting purposes)
jobClass string
tasksByJobClassAndStartTimeSec map[taskClassAndStartKey]taskStateByJobIDTaskID
}
// Contains all the information for a specified task
type taskState struct {
JobId string
TaskId string
Def domain.TaskDefinition
Status domain.Status
TimeStarted time.Time
NumTimesTried int
TaskRunner *taskRunner
AvgDuration time.Duration //average duration for previous runs with this taskId, if any.
}
type taskStatesByDuration []*taskState
// the following types are used to access task State objects by class, startTime, taskID
type taskClassAndStartKey struct {
class string
start time.Time
}
type jobIDTaskIDKey struct {
jobID string
taskID string
}
type taskStateByJobIDTaskID map[jobIDTaskIDKey]*taskState
func (s taskStatesByDuration) Len() int {
return len(s)
}
func (s taskStatesByDuration) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s taskStatesByDuration) Less(i, j int) bool {
return s[i].AvgDuration < s[j].AvgDuration
}
// Creates a New Job State based on the specified Job and Saga
// The jobState will reflect any previous progress made on this job and logged to the Sagalog
// Note: taskDurations is optional and only used to enable sorts using taskStatesByDuration above.
func newJobState(job *domain.Job, jobClass string, saga *saga.Saga, taskDurations *lru.Cache,
tasksByJobClassAndStartTimeSec map[taskClassAndStartKey]taskStateByJobIDTaskID, durationKeyExtractor func(string) string) *jobState {
j := &jobState{
Job: job,
Saga: saga,
Tasks: make([]*taskState, 0),
EndingSaga: false,
TasksCompleted: 0,
TasksRunning: 0,
JobKilled: false,
TimeCreated: time.Now(),
TimeMarker: time.Now(),
jobClass: jobClass,
tasksByJobClassAndStartTimeSec: tasksByJobClassAndStartTimeSec,
}
for _, taskDef := range job.Def.Tasks {
var duration time.Duration
durationKey := durationKeyExtractor(taskDef.TaskID)
if taskDurations != nil {
if iface, ok := taskDurations.Get(durationKey); !ok {
addOrUpdateTaskDuration(taskDurations, durationKey, duration)
} else {
if ad, ok := iface.(*averageDuration); ok {
duration = ad.duration
}
}
}
task := &taskState{
JobId: job.Id,
TaskId: taskDef.TaskID,
Def: taskDef,
Status: domain.NotStarted,
TimeStarted: nilTime,
NumTimesTried: 0,
AvgDuration: duration,
}
j.Tasks = append(j.Tasks, task)
}
// Assumes Forward Recovery only, tasks are either
// done or not done. Scheduler currently doesn't support
// scheduling compensating tasks. In Progress tasks
// are considered not done and will be rescheduled.
sagaState := saga.GetState()
for _, taskId := range sagaState.GetTaskIds() {
if sagaState.IsTaskCompleted(taskId) {
j.getTask(taskId).Status = domain.Completed
j.TasksCompleted++
}
}
return j
}
// Helper, assumes that taskId is present given a consistent jobState.
func (j *jobState) getTask(taskId string) *taskState {
for _, task := range j.Tasks {
if task.TaskId == taskId {
return task
}
}
return nil
}
// Returns a list of taskIds that can be scheduled currently.
func (j *jobState) getUnScheduledTasks() []*taskState {
var tasksToRun []*taskState
for _, state := range j.Tasks {
if state.Status == domain.NotStarted {
tasksToRun = append(tasksToRun, state)
}
}
return tasksToRun
}
// Update JobState to reflect that a Task has been started
func (j *jobState) taskStarted(taskId string, tr *taskRunner) {
taskState := j.getTask(taskId)
start := time.Now()
taskState.Status = domain.InProgress
taskState.TimeStarted = start
taskState.TaskRunner = tr
taskState.NumTimesTried++
j.stateMu.Lock()
j.TasksRunning++
j.stateMu.Unlock()
// add the task to the map of tasks by start time
startTimeSec := taskState.TimeStarted.Truncate(time.Second)
j.addTaskToStartTimeMap(j.jobClass, taskState, startTimeSec)
j.logInconsistentStateValues()
}
// Update JobState to reflect that a Task has been completed
// Running param: true if taskStarted was called for this taskId.
func (j *jobState) taskCompleted(taskId string, running bool) {
taskState := j.getTask(taskId)
startTimeSec := taskState.TimeStarted.Truncate(time.Second)
taskState.Status = domain.Completed
taskState.TimeStarted = nilTime
taskState.TaskRunner = nil
j.stateMu.Lock()
j.TasksCompleted++
if running {
j.TasksRunning--
}
j.stateMu.Unlock()
// remove the task from the map of tasks by start time
j.removeTaskFromStartTimeMap(taskState.JobId, taskId, startTimeSec)
j.logInconsistentStateValues()
}
// Update JobState to reflect that an error has occurred running this Task
func (j *jobState) errorRunningTask(taskId string, err error, preempted bool) {
taskState := j.getTask(taskId)
startTimeSec := taskState.TimeStarted.Truncate(time.Second)
taskState.Status = domain.NotStarted
taskState.TimeStarted = nilTime
taskState.TaskRunner = nil
j.stateMu.Lock()
j.TasksRunning--
j.stateMu.Unlock()
if preempted {
taskState.NumTimesTried--
}
j.removeTaskFromStartTimeMap(taskState.JobId, taskId, startTimeSec)
j.logInconsistentStateValues()
}
func (j *jobState) GetNumRunning() int {
j.stateMu.RLock()
defer j.stateMu.RUnlock()
return j.TasksRunning
}
func (j *jobState) GetNumCompleted() int {
j.stateMu.RLock()
defer j.stateMu.RUnlock()
return j.TasksCompleted
}
// Returns the Current Job Status
func (j *jobState) getJobStatus() domain.Status {
if j.GetNumCompleted() == len(j.Tasks) {
return domain.Completed
}
return domain.InProgress
}
// addTaskToStartTimeMap add the running task to the map that bins running tasks by their class and start time
func (j *jobState) addTaskToStartTimeMap(jobClass string, task *taskState, startTimeSec time.Time) {
if j.tasksByJobClassAndStartTimeSec == nil {
log.Errorf("tasksByJobClassAndStartTime map not found. Skipping adding task to it. jobID: %s, taskID:%s, jobClass:%s, startTime:%s", task.JobId, task.TaskId, jobClass, startTimeSec.Format("2006-01-02 15:04:05 -0700 MST"))
return
}
classNStartBucketKey := taskClassAndStartKey{class: jobClass, start: startTimeSec}
if _, ok := j.tasksByJobClassAndStartTimeSec[classNStartBucketKey]; !ok {
j.tasksByJobClassAndStartTimeSec[classNStartBucketKey] = taskStateByJobIDTaskID{}
}
taskKey := jobIDTaskIDKey{jobID: task.JobId, taskID: task.TaskId}
j.tasksByJobClassAndStartTimeSec[classNStartBucketKey][taskKey] = task
}
// removeTaskFromStartTimeMap remove the completed task from the map that bins running tasks by their class and start time
func (j *jobState) removeTaskFromStartTimeMap(jobID string, taskID string, startTimeSec time.Time) {
if j.tasksByJobClassAndStartTimeSec == nil {
log.Warnf("tasksByJobClassAndStartTime map not found. Skipping removing task from it. jobID: %s, taskID: %s, jobClass:%s, startTime:%s", jobID, taskID, j.jobClass, startTimeSec.Format("2006-01-02 15:04:05 -0700 MST"))
return
}
timeBucket := taskClassAndStartKey{class: j.jobClass, start: startTimeSec}
if _, ok := j.tasksByJobClassAndStartTimeSec[timeBucket]; !ok {
log.Warnf("no %s start time bucket found for the time %s. Skipping removing task %s_%s from it", j.jobClass, startTimeSec.Format("2006-01-02 15:04:05 -0700 MST"), jobID, taskID)
return
}
taskKey := jobIDTaskIDKey{jobID: jobID, taskID: taskID}
if _, ok := j.tasksByJobClassAndStartTimeSec[timeBucket][taskKey]; !ok {
log.Warnf("task %s_%s was not found in %s time bucket found for the job %s. Skipping removing task from it", jobID, taskID, j.jobClass, startTimeSec.Format("2006-01-02 15:04:05 -0700 MST"))
return
}
delete(j.tasksByJobClassAndStartTimeSec[timeBucket], taskKey)
if len(j.tasksByJobClassAndStartTimeSec[timeBucket]) == 0 {
delete(j.tasksByJobClassAndStartTimeSec, timeBucket)
}
}
func (j *jobState) logInconsistentStateValues() {
// TODO remove before deploying to prod or if this slows staging down too much
running := 0
for classNStartKey, v := range j.tasksByJobClassAndStartTimeSec {
if classNStartKey.class != j.jobClass {
continue
}
for jobIDnTaskID, taskState := range v {
if jobIDnTaskID.jobID != j.Job.Id {
continue
}
if taskState.Status == domain.InProgress {
running++
}
}
}
if running != j.TasksRunning {
log.Errorf("inconsistent job state: tasksByJobClassAndStartTimeSec has %d running tasks for job %s,%s,%s, but jobState and %d running tasks",
running, j.jobClass, j.Job.Def.Requestor, j.Job.Id, j.TasksRunning)
}
}