in pkg/scheduler/plugins/sla/sla.go [83:153]
func (sp *slaPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("Enter sla plugin ...")
defer klog.V(4).Infof("Leaving sla plugin.")
// read in sla waiting time for global cluster from sla plugin arguments
// if not set, job waiting time still can set in job yaml separately, otherwise job have no sla limits
if _, exist := sp.pluginArguments[JobWaitingTime]; exist {
waitTime, ok := sp.pluginArguments[JobWaitingTime].(string)
if !ok {
waitTime = ""
}
jwt, err := time.ParseDuration(waitTime)
if err != nil {
klog.Errorf("Error occurs in parsing global job waiting time in sla plugin, err: %s.", err.Error())
}
if jwt <= 0 {
klog.Warningf("Invalid global waiting time setting: %s in sla plugin.", jwt.String())
} else {
sp.jobWaitingTime = &jwt
klog.V(4).Infof("Global job waiting time is %s.", sp.jobWaitingTime.String())
}
}
jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
var lJobWaitingTime = sp.readJobWaitingTime(lv.WaitingTime)
var rJobWaitingTime = sp.readJobWaitingTime(rv.WaitingTime)
if lJobWaitingTime == nil {
if rJobWaitingTime == nil {
return 0
}
return 1
}
if rJobWaitingTime == nil {
return -1
}
lCreationTimestamp := lv.CreationTimestamp
rCreationTimestamp := rv.CreationTimestamp
if lCreationTimestamp.Add(*lJobWaitingTime).Before(rCreationTimestamp.Add(*rJobWaitingTime)) {
return -1
} else if lCreationTimestamp.Add(*lJobWaitingTime).After(rCreationTimestamp.Add(*rJobWaitingTime)) {
return 1
}
return 0
}
ssn.AddJobOrderFn(sp.Name(), jobOrderFn)
permitableFn := func(obj interface{}) int {
jobInfo := obj.(*api.JobInfo)
var jwt = sp.readJobWaitingTime(jobInfo.WaitingTime)
if jwt == nil {
return util.Abstain
}
if time.Since(jobInfo.CreationTimestamp.Time) < *jwt {
return util.Abstain
}
return util.Permit
}
// if job waiting time is over, turn job to be inqueue in enqueue action
ssn.AddJobEnqueueableFn(sp.Name(), permitableFn)
// if job waiting time is over, turn job to be pipelined in allocate action
ssn.AddJobPipelinedFn(sp.Name(), permitableFn)
}