in pkg/scheduler/framework/session.go [289:360]
func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) {
podVolumes, err := ssn.cache.GetPodVolumes(task, nodeInfo.Node)
if err != nil {
return err
}
hostname := nodeInfo.Name
if err := ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil {
return err
}
defer func() {
if err != nil {
ssn.cache.RevertVolumes(task, podVolumes)
}
}()
task.Pod.Spec.NodeName = hostname
task.PodVolumes = podVolumes
// Only update status in session
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
task.NodeName = hostname
if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, ssn.UID, err)
return err
}
klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
hostname, ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}
// Callbacks
for _, eh := range ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}
if ssn.JobReady(job) {
for _, task := range job.TaskStatusIndex[api.Allocated] {
if err := ssn.dispatch(task); err != nil {
klog.Errorf("Failed to dispatch task <%v/%v>: %v",
task.Namespace, task.Name, err)
return err
}
}
} else {
ssn.cache.RevertVolumes(task, podVolumes)
}
return nil
}