in scheduler/server/stateful_scheduler.go [380:475]
func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, error) {
/*
Put the job request and a callback channel on the check job channel. Wait for the
scheduling thread to pick up the request from the check job channel, verify that it
can handle the request and return either nil or an error on the callback channel.
If no error is found, generate an id for the job, start a saga for the job and add the
job to the add job channel.
Return either the error message or job id to the caller.
*/
defer s.stat.Latency(stats.SchedJobLatency_ms).Time().Stop()
s.stat.Counter(stats.SchedJobRequestsCounter).Inc(1)
log.WithFields(
log.Fields{
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"numTasks": len(jobDef.Tasks),
}).Info("New job request")
checkResultCh := make(chan error, 1)
s.checkJobCh <- jobCheckMsg{
jobDef: &jobDef,
resultCh: checkResultCh,
}
err := <-checkResultCh
if err != nil {
log.WithFields(
log.Fields{
"jobDef": jobDef,
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"err": err,
}).Error("Rejected job request")
return "", err
}
job := &domain.Job{
Id: generateJobId(),
Def: jobDef,
}
if job.Def.Tag == "" {
job.Def.Tag = job.Id
}
asBytes, err := job.Serialize()
if err != nil {
log.WithFields(
log.Fields{
"jobDef": jobDef,
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"err": err,
}).Error("Failed to serialize job request")
return "", err
}
// Log StartSaga Message
sagaObj, err := s.sagaCoord.MakeSaga(job.Id, asBytes)
if err != nil {
log.WithFields(
log.Fields{
"jobDef": jobDef,
"err": err,
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
}).Error("Failed to create saga for job request")
return "", err
}
log.WithFields(
log.Fields{
"requestor": jobDef.Requestor,
"jobType": jobDef.JobType,
"tag": jobDef.Tag,
"basis": jobDef.Basis,
"priority": jobDef.Priority,
"numTasks": len(jobDef.Tasks),
}).Info("Queueing job request")
s.stat.Counter(stats.SchedJobsCounter).Inc(1)
s.addJobCh <- jobAddedMsg{
job: job,
saga: sagaObj,
}
return job.Id, nil
}