in pkg/controllers/job/job_controller_handler.go [191:288]
func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("Failed to convert %v to v1.Pod", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
klog.Errorf("Failed to convert %v to v1.Pod", newObj)
return
}
// Filter out pods that are not created from volcano job
if !isControlledBy(newPod, helpers.JobKind) {
return
}
if newPod.ResourceVersion == oldPod.ResourceVersion {
return
}
if newPod.DeletionTimestamp != nil {
cc.deletePod(newObj)
return
}
taskName, found := newPod.Annotations[batch.TaskSpecKey]
if !found {
klog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
jobName, found := newPod.Annotations[batch.JobNameKey]
if !found {
klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
version, found := newPod.Annotations[batch.JobVersion]
if !found {
klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
dVersion, err := strconv.Atoi(version)
if err != nil {
klog.Infof("Failed to convert jobVersion of Pod into number <%s/%s>, skipping",
newPod.Namespace, newPod.Name)
return
}
if err := cc.cache.UpdatePod(newPod); err != nil {
klog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
}
event := bus.OutOfSyncEvent
var exitCode int32
switch newPod.Status.Phase {
case v1.PodFailed:
if oldPod.Status.Phase != v1.PodFailed {
event = bus.PodFailedEvent
// TODO: currently only one container pod is supported by volcano
// Once multi containers pod is supported, update accordingly.
if len(newPod.Status.ContainerStatuses) > 0 && newPod.Status.ContainerStatuses[0].State.Terminated != nil {
exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode
}
}
case v1.PodSucceeded:
if oldPod.Status.Phase != v1.PodSucceeded &&
cc.cache.TaskCompleted(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = bus.TaskCompletedEvent
}
case v1.PodPending, v1.PodRunning:
if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = bus.TaskFailedEvent
}
}
req := apis.Request{
Namespace: newPod.Namespace,
JobName: jobName,
TaskName: taskName,
Event: event,
ExitCode: exitCode,
JobVersion: int32(dVersion),
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}