func()

in pkg/scheduler/plugins/sla/sla.go [83:153]


func (sp *slaPlugin) OnSessionOpen(ssn *framework.Session) {
	klog.V(4).Infof("Enter sla plugin ...")
	defer klog.V(4).Infof("Leaving sla plugin.")

	// read in sla waiting time for global cluster from sla plugin arguments
	// if not set, job waiting time still can set in job yaml separately, otherwise job have no sla limits
	if _, exist := sp.pluginArguments[JobWaitingTime]; exist {
		waitTime, ok := sp.pluginArguments[JobWaitingTime].(string)
		if !ok {
			waitTime = ""
		}
		jwt, err := time.ParseDuration(waitTime)
		if err != nil {
			klog.Errorf("Error occurs in parsing global job waiting time in sla plugin, err: %s.", err.Error())
		}

		if jwt <= 0 {
			klog.Warningf("Invalid global waiting time setting: %s in sla plugin.", jwt.String())
		} else {
			sp.jobWaitingTime = &jwt
			klog.V(4).Infof("Global job waiting time is %s.", sp.jobWaitingTime.String())
		}
	}

	jobOrderFn := func(l, r interface{}) int {
		lv := l.(*api.JobInfo)
		rv := r.(*api.JobInfo)

		var lJobWaitingTime = sp.readJobWaitingTime(lv.WaitingTime)
		var rJobWaitingTime = sp.readJobWaitingTime(rv.WaitingTime)

		if lJobWaitingTime == nil {
			if rJobWaitingTime == nil {
				return 0
			}
			return 1
		}
		if rJobWaitingTime == nil {
			return -1
		}

		lCreationTimestamp := lv.CreationTimestamp
		rCreationTimestamp := rv.CreationTimestamp
		if lCreationTimestamp.Add(*lJobWaitingTime).Before(rCreationTimestamp.Add(*rJobWaitingTime)) {
			return -1
		} else if lCreationTimestamp.Add(*lJobWaitingTime).After(rCreationTimestamp.Add(*rJobWaitingTime)) {
			return 1
		}
		return 0
	}
	ssn.AddJobOrderFn(sp.Name(), jobOrderFn)

	permitableFn := func(obj interface{}) int {
		jobInfo := obj.(*api.JobInfo)
		var jwt = sp.readJobWaitingTime(jobInfo.WaitingTime)

		if jwt == nil {
			return util.Abstain
		}

		if time.Since(jobInfo.CreationTimestamp.Time) < *jwt {
			return util.Abstain
		}

		return util.Permit
	}
	// if job waiting time is over, turn job to be inqueue in enqueue action
	ssn.AddJobEnqueueableFn(sp.Name(), permitableFn)
	// if job waiting time is over, turn job to be pipelined in allocate action
	ssn.AddJobPipelinedFn(sp.Name(), permitableFn)
}