func()

in pkg/controllers/job/job_controller_handler.go [191:288]


func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
	oldPod, ok := oldObj.(*v1.Pod)
	if !ok {
		klog.Errorf("Failed to convert %v to v1.Pod", oldObj)
		return
	}

	newPod, ok := newObj.(*v1.Pod)
	if !ok {
		klog.Errorf("Failed to convert %v to v1.Pod", newObj)
		return
	}

	// Filter out pods that are not created from volcano job
	if !isControlledBy(newPod, helpers.JobKind) {
		return
	}

	if newPod.ResourceVersion == oldPod.ResourceVersion {
		return
	}

	if newPod.DeletionTimestamp != nil {
		cc.deletePod(newObj)
		return
	}

	taskName, found := newPod.Annotations[batch.TaskSpecKey]
	if !found {
		klog.Infof("Failed to find taskName of Pod <%s/%s>, skipping",
			newPod.Namespace, newPod.Name)
		return
	}

	jobName, found := newPod.Annotations[batch.JobNameKey]
	if !found {
		klog.Infof("Failed to find jobName of Pod <%s/%s>, skipping",
			newPod.Namespace, newPod.Name)
		return
	}

	version, found := newPod.Annotations[batch.JobVersion]
	if !found {
		klog.Infof("Failed to find jobVersion of Pod <%s/%s>, skipping",
			newPod.Namespace, newPod.Name)
		return
	}

	dVersion, err := strconv.Atoi(version)
	if err != nil {
		klog.Infof("Failed to convert jobVersion of Pod into number <%s/%s>, skipping",
			newPod.Namespace, newPod.Name)
		return
	}

	if err := cc.cache.UpdatePod(newPod); err != nil {
		klog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
			newPod.Namespace, newPod.Name, err)
	}

	event := bus.OutOfSyncEvent
	var exitCode int32

	switch newPod.Status.Phase {
	case v1.PodFailed:
		if oldPod.Status.Phase != v1.PodFailed {
			event = bus.PodFailedEvent
			// TODO: currently only one container pod is supported by volcano
			// Once multi containers pod is supported, update accordingly.
			if len(newPod.Status.ContainerStatuses) > 0 && newPod.Status.ContainerStatuses[0].State.Terminated != nil {
				exitCode = newPod.Status.ContainerStatuses[0].State.Terminated.ExitCode
			}
		}
	case v1.PodSucceeded:
		if oldPod.Status.Phase != v1.PodSucceeded &&
			cc.cache.TaskCompleted(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
			event = bus.TaskCompletedEvent
		}
	case v1.PodPending, v1.PodRunning:
		if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
			event = bus.TaskFailedEvent
		}
	}

	req := apis.Request{
		Namespace: newPod.Namespace,
		JobName:   jobName,
		TaskName:  taskName,

		Event:      event,
		ExitCode:   exitCode,
		JobVersion: int32(dVersion),
	}

	key := jobhelpers.GetJobKeyByReq(&req)
	queue := cc.getWorkerQueue(key)
	queue.Add(req)
}