in pkg/scheduler/framework/statement.go [228:293]
func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) {
podVolumes, err := s.ssn.cache.GetPodVolumes(task, nodeInfo.Node)
if err != nil {
return err
}
hostname := nodeInfo.Name
if err := s.ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil {
return err
}
defer func() {
if err != nil {
s.ssn.cache.RevertVolumes(task, podVolumes)
}
}()
task.Pod.Spec.NodeName = hostname
task.PodVolumes = podVolumes
// Only update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, s.ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
return err
}
klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
hostname, s.ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}
// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}
// Update status in session
klog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: Allocate,
task: task,
})
return nil
}