in pkg/controllers/job/job_controller.go [288:353]
func (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
req := obj.(apis.Request)
defer queue.Done(req)
key := jobcache.JobKeyByReq(&req)
if !cc.belongsToThisRoutine(key, count) {
klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}
klog.V(3).Infof("Try to handle request <%v>", req)
jobInfo, err := cc.cache.Get(key)
if err != nil {
// TODO(k82cn): ignore not-ready error.
klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return true
}
st := state.NewState(jobInfo)
if st == nil {
klog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return true
}
action := applyPolicies(jobInfo.Job, &req)
klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)
if action != busv1alpha1.SyncJobAction {
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Start to execute action %s ", action))
}
if err := st.Execute(action); err != nil {
if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
queue.AddRateLimited(req)
return true
}
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Job failed on action %s for retry limit reached", action))
klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name)
if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil {
klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
// If no error, forget it.
queue.Forget(req)
return true
}