scheduler/domain/definitions.go (168 lines of code) (raw):
// Package domain provides definitions for Scoot Jobs and Tasks
package domain
import (
"fmt"
"time"
"github.com/twitter/scoot/common/log/tags"
"github.com/twitter/scoot/common/thrifthelpers"
"github.com/twitter/scoot/runner"
schedthrift "github.com/twitter/scoot/scheduler/domain/gen-go/sched"
)
// Job is the job Scoot can schedule
type Job struct {
Id string
Def JobDefinition
}
// Serialize Job to binary slice, and error is
// returned if the object cannot be Serialized
func (j *Job) Serialize() ([]byte, error) {
thriftJob, err := makeThriftJobFromDomainJob(j)
if err != nil {
return nil, err
}
return thrifthelpers.BinarySerialize(thriftJob)
}
// Desrialize a binary slice to a Job,
// an error is returned if it cannot be deserialized.
func DeserializeJob(input []byte) (*Job, error) {
thriftJob := schedthrift.NewJob()
err := thrifthelpers.BinaryDeserialize(thriftJob, input)
if err != nil {
return nil, err
}
job := makeDomainJobFromThriftJob(thriftJob)
return job, nil
}
// JobDefinition is the definition the client sent us
type JobDefinition struct {
JobType string
Requestor string
Basis string
Tag string
Priority Priority
Tasks []TaskDefinition
}
func (jd *JobDefinition) String() string {
return fmt.Sprintf("jobType:%s, req:%s, tag:%s, basis: %s, tasks:%d", jd.JobType, jd.Requestor, jd.Tag, jd.Basis, len(jd.Tasks))
}
// Task is one task to run
type TaskDefinition struct {
runner.Command
}
type OfflineWorkerReq struct {
ID string
Requestor string
}
type ReinstateWorkerReq struct {
ID string
Requestor string
}
// Status for Job & Tasks
type Status int
const (
// NotRunning, waiting to be scheduled
NotStarted Status = iota
// Currently Scheduled and In Progress Job/Task
InProgress
// Successfully Completed Job/Task
Completed
// Job was Aborted, Compensating Tasks are being Applied.
// A RollingBack task has not finished its compensating
// tasks yet.
RollingBack
// Job/Task finished unsuccessfully all compensating actions
// have been applied.
RolledBack
)
func (s Status) String() string {
asString := [6]string{"NotStarted", "InProgress", "Completed", "Killed", "RollingBack", "RolledBack"}
return asString[s]
}
type Priority int
const (
// Default, queue new runs until any resources are available
P0 Priority = iota
// Run asap, ahead of priority=0, consuming nodes as they become available if no nodes are free
P1
// Run asap, ahead of priority=1, consuming nodes as they become available if no nodes are free
P2
)
// transforms a thrift Job into a scheduler Job
func makeDomainJobFromThriftJob(thriftJob *schedthrift.Job) *Job {
if thriftJob == nil {
return nil
}
jobType := ""
priority := int32(0)
tag := ""
basis := ""
requestor := ""
thriftJobDef := thriftJob.GetJobDefinition()
jobID := thriftJob.GetID()
domainTasks := make([]TaskDefinition, 0)
if thriftJobDef != nil {
for _, task := range thriftJobDef.GetTasks() {
cmd := task.GetCommand()
command := runner.Command{
Argv: cmd.GetArgv(),
EnvVars: cmd.GetEnvVars(),
Timeout: time.Duration(cmd.GetTimeout()),
SnapshotID: cmd.GetSnapshotId(),
LogTags: tags.LogTags{
JobID: jobID,
TaskID: task.GetTaskId(),
Tag: thriftJobDef.GetTag(),
},
}
domainTasks = append(domainTasks, TaskDefinition{command})
}
jobType = thriftJobDef.GetJobType()
priority = thriftJobDef.GetPriority()
tag = thriftJobDef.GetTag()
basis = thriftJobDef.GetBasis()
requestor = thriftJobDef.GetRequestor()
}
domainJobDef := JobDefinition{
JobType: jobType,
Tasks: domainTasks,
Priority: Priority(priority),
Basis: basis,
Requestor: requestor,
Tag: tag,
}
return &Job{
Id: jobID,
Def: domainJobDef,
}
}
// converts a scheduler Job into a Thrift Job
func makeThriftJobFromDomainJob(domainJob *Job) (*schedthrift.Job, error) {
if domainJob == nil {
return nil, nil
}
thriftTasks := make([]*schedthrift.TaskDefinition, 0)
for _, domainTask := range domainJob.Def.Tasks {
to := int64(domainTask.Timeout)
cmd := schedthrift.Command{
Argv: domainTask.Argv,
EnvVars: domainTask.EnvVars,
Timeout: &to,
SnapshotId: domainTask.SnapshotID,
}
taskId := domainTask.TaskID
thriftTask := schedthrift.TaskDefinition{Command: &cmd, TaskId: &taskId}
thriftTasks = append(thriftTasks, &thriftTask)
}
prio := int32(domainJob.Def.Priority)
thriftJobDefinition := schedthrift.JobDefinition{
JobType: &(*domainJob).Def.JobType,
Tasks: thriftTasks,
Priority: &prio,
Tag: &(domainJob).Def.Tag,
Basis: &(domainJob).Def.Basis,
Requestor: &(domainJob).Def.Requestor,
}
thriftJob := schedthrift.Job{
ID: domainJob.Id,
JobDefinition: &thriftJobDefinition,
}
return &thriftJob, nil
}
// Validate a job, returning an *InvalidJobRequest if invalid.
func ValidateJob(job JobDefinition) error {
if len(job.Tasks) == 0 {
return fmt.Errorf("invalid job. Must have at least 1 task; was empty")
}
for _, task := range job.Tasks {
if task.TaskID == "" {
return fmt.Errorf("invalid task id \"\".")
}
if len(task.Command.Argv) == 0 {
return fmt.Errorf("invalid task.Command.Argv. Must have at least one argument; was empty")
}
}
return nil
}
func ValidateMaxTasks(maxTasks int) error {
// validation is also implemented in scheduler/client/client.go. This implementation cannot be used
// there because it causes a circular dependency. The two implementations can be consolidated
// when the code is restructured
if maxTasks < -1 {
return fmt.Errorf("invalid tasks limit:%d. Must be >= -1.", maxTasks)
}
return nil
}