pkg/scheduler/plugins/extender/extender.go (210 lines of code) (raw):
/*
Copyright 2022 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package extender
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"k8s.io/klog"
"net/http"
"strings"
"time"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/util"
)
const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "extender"
// ExtenderURLPrefix is the key for providing extender endpoint address
ExtenderURLPrefix = "extender.urlPrefix"
// ExtenderHTTPTimeout is the timeout for extender http calls
ExtenderHTTPTimeout = "extender.httpTimeout"
// ExtenderOnSessionOpenVerb is the verb of OnSessionOpen method
ExtenderOnSessionOpenVerb = "extender.onSessionOpenVerb"
// ExtenderOnSessionCloseVerb is the verb of OnSessionClose method
ExtenderOnSessionCloseVerb = "extender.onSessionCloseVerb"
// ExtenderPredicateVerb is the verb of Predicate method
ExtenderPredicateVerb = "extender.predicateVerb"
// ExtenderPrioritizeVerb is the verb of Prioritize method
ExtenderPrioritizeVerb = "extender.prioritizeVerb"
// ExtenderPreemptableVerb is the verb of Preemptable method
ExtenderPreemptableVerb = "extender.preemptableVerb"
// ExtenderReclaimableVerb is the verb of Reclaimable method
ExtenderReclaimableVerb = "extender.reclaimableVerb"
// ExtenderQueueOverusedVerb is the verb of QueueOverused method
ExtenderQueueOverusedVerb = "extender.queueOverusedVerb"
// ExtenderJobEnqueueableVerb is the verb of JobEnqueueable method
ExtenderJobEnqueueableVerb = "extender.jobEnqueueableVerb"
// ExtenderIgnorable indicates whether the extender can ignore unexpected errors
ExtenderIgnorable = "extender.ignorable"
)
type extenderConfig struct {
urlPrefix string
httpTimeout time.Duration
onSessionOpenVerb string
onSessionCloseVerb string
predicateVerb string
prioritizeVerb string
preemptableVerb string
reclaimableVerb string
queueOverusedVerb string
jobEnqueueableVerb string
ignorable bool
}
type extenderPlugin struct {
client http.Client
config *extenderConfig
}
func parseExtenderConfig(arguments framework.Arguments) *extenderConfig {
/*
actions: "reclaim, allocate, backfill, preempt"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: extender
arguments:
extender.urlPrefix: http://127.0.0.1
extender.httpTimeout: 100ms
extender.onSessionOpenVerb: onSessionOpen
extender.onSessionCloseVerb: onSessionClose
extender.predicateVerb: predicate
extender.prioritizeVerb: prioritize
extender.preemptableVerb: preemptable
extender.reclaimableVerb: reclaimable
extender.queueOverusedVerb: queueOverused
extender.jobEnqueueableVerb: jobEnqueueable
extender.ignorable: true
- name: proportion
- name: nodeorder
*/
ec := &extenderConfig{}
ec.urlPrefix, _ = arguments[ExtenderURLPrefix].(string)
ec.onSessionOpenVerb, _ = arguments[ExtenderOnSessionOpenVerb].(string)
ec.onSessionCloseVerb, _ = arguments[ExtenderOnSessionCloseVerb].(string)
ec.predicateVerb, _ = arguments[ExtenderPredicateVerb].(string)
ec.prioritizeVerb, _ = arguments[ExtenderPrioritizeVerb].(string)
ec.preemptableVerb, _ = arguments[ExtenderPreemptableVerb].(string)
ec.reclaimableVerb, _ = arguments[ExtenderReclaimableVerb].(string)
ec.queueOverusedVerb, _ = arguments[ExtenderQueueOverusedVerb].(string)
ec.jobEnqueueableVerb, _ = arguments[ExtenderJobEnqueueableVerb].(string)
arguments.GetBool(&ec.ignorable, ExtenderIgnorable)
ec.httpTimeout = time.Second
if httpTimeout, _ := arguments[ExtenderHTTPTimeout].(string); httpTimeout != "" {
if timeoutDuration, err := time.ParseDuration(httpTimeout); err == nil {
ec.httpTimeout = timeoutDuration
}
}
return ec
}
func New(arguments framework.Arguments) framework.Plugin {
cfg := parseExtenderConfig(arguments)
klog.V(4).Infof("Initialize extender plugin with endpoint address %s", cfg.urlPrefix)
return &extenderPlugin{client: http.Client{Timeout: cfg.httpTimeout}, config: cfg}
}
func (ep *extenderPlugin) Name() string {
return PluginName
}
func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) {
if ep.config.onSessionOpenVerb != "" {
err := ep.send(ep.config.onSessionOpenVerb, &OnSessionOpenRequest{
Jobs: ssn.Jobs,
Nodes: ssn.Nodes,
Queues: ssn.Queues,
NamespaceInfo: ssn.NamespaceInfo,
RevocableNodes: ssn.RevocableNodes,
}, nil)
if err != nil {
klog.Warningf("OnSessionClose failed with error %v", err)
}
if err != nil && !ep.config.ignorable {
return
}
}
if ep.config.predicateVerb != "" {
ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
resp := &PredicateResponse{}
err := ep.send(ep.config.predicateVerb, &PredicateRequest{Task: task, Node: node}, resp)
if err != nil {
klog.Warningf("Predicate failed with error %v", err)
if ep.config.ignorable {
return nil
}
return err
}
if resp.ErrorMessage == "" {
return nil
}
return errors.New(resp.ErrorMessage)
})
}
if ep.config.prioritizeVerb != "" {
ssn.AddBatchNodeOrderFn(ep.Name(), func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
resp := &PrioritizeResponse{}
err := ep.send(ep.config.prioritizeVerb, &PrioritizeRequest{Task: task, Nodes: nodes}, resp)
if err != nil {
klog.Warningf("Prioritize failed with error %v", err)
if ep.config.ignorable {
return nil, nil
}
return nil, err
}
if resp.ErrorMessage == "" && resp.NodeScore != nil {
return resp.NodeScore, nil
}
return nil, errors.New(resp.ErrorMessage)
})
}
if ep.config.preemptableVerb != "" {
ssn.AddPreemptableFn(ep.Name(), func(evictor *api.TaskInfo, evictees []*api.TaskInfo) ([]*api.TaskInfo, int) {
resp := &PreemptableResponse{}
err := ep.send(ep.config.preemptableVerb, &PreemptableRequest{Evictor: evictor, Evictees: evictees}, resp)
if err != nil {
klog.Warningf("Preemptable failed with error %v", err)
if ep.config.ignorable {
return nil, util.Permit
}
return nil, util.Reject
}
return resp.Victims, resp.Status
})
}
if ep.config.reclaimableVerb != "" {
ssn.AddReclaimableFn(ep.Name(), func(evictor *api.TaskInfo, evictees []*api.TaskInfo) ([]*api.TaskInfo, int) {
resp := &ReclaimableResponse{}
err := ep.send(ep.config.reclaimableVerb, &ReclaimableRequest{Evictor: evictor, Evictees: evictees}, resp)
if err != nil {
klog.Warningf("Reclaimable failed with error %v", err)
if ep.config.ignorable {
return nil, util.Permit
}
return nil, util.Reject
}
return resp.Victims, resp.Status
})
}
if ep.config.jobEnqueueableVerb != "" {
ssn.AddJobEnqueueableFn(ep.Name(), func(obj interface{}) int {
job := obj.(*api.JobInfo)
resp := &JobEnqueueableResponse{}
err := ep.send(ep.config.reclaimableVerb, &JobEnqueueableRequest{Job: job}, resp)
if err != nil {
klog.Warningf("JobEnqueueable failed with error %v", err)
if ep.config.ignorable {
return util.Permit
}
return util.Reject
}
return resp.Status
})
}
if ep.config.queueOverusedVerb != "" {
ssn.AddOverusedFn(ep.Name(), func(obj interface{}) bool {
queue := obj.(*api.QueueInfo)
resp := &QueueOverusedResponse{}
err := ep.send(ep.config.reclaimableVerb, &QueueOverusedRequest{Queue: queue}, resp)
if err != nil {
klog.Warningf("QueueOverused failed with error %v", err)
return !ep.config.ignorable
}
return resp.Overused
})
}
}
func (ep *extenderPlugin) OnSessionClose(ssn *framework.Session) {
if ep.config.onSessionCloseVerb != "" {
if err := ep.send(ep.config.onSessionCloseVerb, &OnSessionCloseRequest{}, nil); err != nil {
klog.Warningf("OnSessionClose failed with error %v", err)
}
}
}
func (ep *extenderPlugin) send(action string, args interface{}, result interface{}) error {
out, err := json.Marshal(args)
if err != nil {
return err
}
url := strings.TrimRight(ep.config.urlPrefix, "/") + "/" + action
req, err := http.NewRequest("POST", url, bytes.NewReader(out))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := ep.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
}
if result != nil {
return json.NewDecoder(resp.Body).Decode(result)
}
return nil
}