in pkg/controllers/job/job_controller_actions.go [47:164]
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
klog.V(3).Infof("Killing Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
defer klog.V(3).Infof("Finished Job <%s/%s> killing, current version %d", job.Namespace, job.Name, job.Status.Version)
if job.DeletionTimestamp != nil {
klog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}
var pending, running, terminating, succeeded, failed, unknown int32
taskStatusCount := make(map[string]batch.TaskState)
var errs []error
var total int
for _, pods := range jobInfo.Pods {
for _, pod := range pods {
total++
if pod.DeletionTimestamp != nil {
klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
terminating++
continue
}
maxRetry := job.Spec.MaxRetry
lastRetry := false
if job.Status.RetryCount >= maxRetry-1 {
lastRetry = true
}
// Only retain the Failed and Succeeded pods at the last retry.
// If it is not the last retry, kill pod as defined in `podRetainPhase`.
retainPhase := podRetainPhase
if lastRetry {
retainPhase = state.PodRetainPhaseSoft
}
_, retain := retainPhase[pod.Status.Phase]
if !retain {
err := cc.deleteJobPod(job.Name, pod)
if err == nil {
terminating++
continue
}
// record the err, and then collect the pod info like retained pod
errs = append(errs, err)
cc.resyncTask(pod)
}
classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount)
}
}
if len(errs) != 0 {
klog.Errorf("failed to kill pods for job %s/%s, with err %+v", job.Namespace, job.Name, errs)
cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason,
fmt.Sprintf("Error deleting pods: %+v", errs))
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
}
job = job.DeepCopy()
// Job version is bumped only when job is killed
job.Status.Version++
job.Status.Pending = pending
job.Status.Running = running
job.Status.Succeeded = succeeded
job.Status.Failed = failed
job.Status.Terminating = terminating
job.Status.Unknown = unknown
job.Status.TaskStatusCount = taskStatusCount
// Update running duration
klog.V(3).Infof("Running duration is %s", metav1.Duration{Duration: time.Since(jobInfo.Job.CreationTimestamp.Time)}.ToUnstructured())
job.Status.RunningDuration = &metav1.Duration{Duration: time.Since(jobInfo.Job.CreationTimestamp.Time)}
if updateStatus != nil {
if updateStatus(&job.Status) {
job.Status.State.LastTransitionTime = metav1.Now()
jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
}
}
// must be called before update job status
if err := cc.pluginOnJobDelete(job); err != nil {
return err
}
// Update Job status
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
// Delete PodGroup
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
}
// NOTE(k82cn): DO NOT delete input/output until job is deleted.
return nil
}