in pkg/scheduler/plugins/extender/extender.go [140:263]
func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) {
if ep.config.onSessionOpenVerb != "" {
err := ep.send(ep.config.onSessionOpenVerb, &OnSessionOpenRequest{
Jobs: ssn.Jobs,
Nodes: ssn.Nodes,
Queues: ssn.Queues,
NamespaceInfo: ssn.NamespaceInfo,
RevocableNodes: ssn.RevocableNodes,
}, nil)
if err != nil {
klog.Warningf("OnSessionClose failed with error %v", err)
}
if err != nil && !ep.config.ignorable {
return
}
}
if ep.config.predicateVerb != "" {
ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
resp := &PredicateResponse{}
err := ep.send(ep.config.predicateVerb, &PredicateRequest{Task: task, Node: node}, resp)
if err != nil {
klog.Warningf("Predicate failed with error %v", err)
if ep.config.ignorable {
return nil
}
return err
}
if resp.ErrorMessage == "" {
return nil
}
return errors.New(resp.ErrorMessage)
})
}
if ep.config.prioritizeVerb != "" {
ssn.AddBatchNodeOrderFn(ep.Name(), func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
resp := &PrioritizeResponse{}
err := ep.send(ep.config.prioritizeVerb, &PrioritizeRequest{Task: task, Nodes: nodes}, resp)
if err != nil {
klog.Warningf("Prioritize failed with error %v", err)
if ep.config.ignorable {
return nil, nil
}
return nil, err
}
if resp.ErrorMessage == "" && resp.NodeScore != nil {
return resp.NodeScore, nil
}
return nil, errors.New(resp.ErrorMessage)
})
}
if ep.config.preemptableVerb != "" {
ssn.AddPreemptableFn(ep.Name(), func(evictor *api.TaskInfo, evictees []*api.TaskInfo) ([]*api.TaskInfo, int) {
resp := &PreemptableResponse{}
err := ep.send(ep.config.preemptableVerb, &PreemptableRequest{Evictor: evictor, Evictees: evictees}, resp)
if err != nil {
klog.Warningf("Preemptable failed with error %v", err)
if ep.config.ignorable {
return nil, util.Permit
}
return nil, util.Reject
}
return resp.Victims, resp.Status
})
}
if ep.config.reclaimableVerb != "" {
ssn.AddReclaimableFn(ep.Name(), func(evictor *api.TaskInfo, evictees []*api.TaskInfo) ([]*api.TaskInfo, int) {
resp := &ReclaimableResponse{}
err := ep.send(ep.config.reclaimableVerb, &ReclaimableRequest{Evictor: evictor, Evictees: evictees}, resp)
if err != nil {
klog.Warningf("Reclaimable failed with error %v", err)
if ep.config.ignorable {
return nil, util.Permit
}
return nil, util.Reject
}
return resp.Victims, resp.Status
})
}
if ep.config.jobEnqueueableVerb != "" {
ssn.AddJobEnqueueableFn(ep.Name(), func(obj interface{}) int {
job := obj.(*api.JobInfo)
resp := &JobEnqueueableResponse{}
err := ep.send(ep.config.reclaimableVerb, &JobEnqueueableRequest{Job: job}, resp)
if err != nil {
klog.Warningf("JobEnqueueable failed with error %v", err)
if ep.config.ignorable {
return util.Permit
}
return util.Reject
}
return resp.Status
})
}
if ep.config.queueOverusedVerb != "" {
ssn.AddOverusedFn(ep.Name(), func(obj interface{}) bool {
queue := obj.(*api.QueueInfo)
resp := &QueueOverusedResponse{}
err := ep.send(ep.config.reclaimableVerb, &QueueOverusedRequest{Queue: queue}, resp)
if err != nil {
klog.Warningf("QueueOverused failed with error %v", err)
return !ep.config.ignorable
}
return resp.Overused
})
}
}