func()

in scheduler/server/stateful_scheduler.go [380:475]


func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, error) {
	/*
		Put the job request and a callback channel on the check job channel.  Wait for the
		scheduling thread to pick up the request from the check job channel, verify that it
		can handle the request and return either nil or an error on the callback channel.

		If no error is found, generate an id for the job, start a saga for the job and add the
		job to the add job channel.

		 Return either the error message or job id to the caller.
	*/
	defer s.stat.Latency(stats.SchedJobLatency_ms).Time().Stop()
	s.stat.Counter(stats.SchedJobRequestsCounter).Inc(1)
	log.WithFields(
		log.Fields{
			"requestor": jobDef.Requestor,
			"jobType":   jobDef.JobType,
			"tag":       jobDef.Tag,
			"basis":     jobDef.Basis,
			"priority":  jobDef.Priority,
			"numTasks":  len(jobDef.Tasks),
		}).Info("New job request")

	checkResultCh := make(chan error, 1)
	s.checkJobCh <- jobCheckMsg{
		jobDef:   &jobDef,
		resultCh: checkResultCh,
	}
	err := <-checkResultCh
	if err != nil {
		log.WithFields(
			log.Fields{
				"jobDef":    jobDef,
				"requestor": jobDef.Requestor,
				"jobType":   jobDef.JobType,
				"tag":       jobDef.Tag,
				"basis":     jobDef.Basis,
				"priority":  jobDef.Priority,
				"err":       err,
			}).Error("Rejected job request")
		return "", err
	}

	job := &domain.Job{
		Id:  generateJobId(),
		Def: jobDef,
	}
	if job.Def.Tag == "" {
		job.Def.Tag = job.Id
	}

	asBytes, err := job.Serialize()
	if err != nil {
		log.WithFields(
			log.Fields{
				"jobDef":    jobDef,
				"requestor": jobDef.Requestor,
				"jobType":   jobDef.JobType,
				"tag":       jobDef.Tag,
				"basis":     jobDef.Basis,
				"priority":  jobDef.Priority,
				"err":       err,
			}).Error("Failed to serialize job request")
		return "", err
	}

	// Log StartSaga Message
	sagaObj, err := s.sagaCoord.MakeSaga(job.Id, asBytes)
	if err != nil {
		log.WithFields(
			log.Fields{
				"jobDef":    jobDef,
				"err":       err,
				"requestor": jobDef.Requestor,
				"jobType":   jobDef.JobType,
				"tag":       jobDef.Tag,
			}).Error("Failed to create saga for job request")
		return "", err
	}
	log.WithFields(
		log.Fields{
			"requestor": jobDef.Requestor,
			"jobType":   jobDef.JobType,
			"tag":       jobDef.Tag,
			"basis":     jobDef.Basis,
			"priority":  jobDef.Priority,
			"numTasks":  len(jobDef.Tasks),
		}).Info("Queueing job request")
	s.stat.Counter(stats.SchedJobsCounter).Inc(1)
	s.addJobCh <- jobAddedMsg{
		job:  job,
		saga: sagaObj,
	}

	return job.Id, nil
}