in pkg/scheduler/plugins/gang/gang.go [51:158]
func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
validJobFn := func(obj interface{}) *api.ValidateResult {
job, ok := obj.(*api.JobInfo)
if !ok {
return &api.ValidateResult{
Pass: false,
Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
}
}
if valid := job.CheckTaskMinAvailable(); !valid {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.NotEnoughPodsOfTaskReason,
Message: "Not enough valid pods of each task for gang-scheduling",
}
}
vtn := job.ValidTaskNum()
if vtn < job.MinAvailable {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.NotEnoughPodsReason,
Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
vtn, job.MinAvailable),
}
}
return nil
}
ssn.AddJobValidFn(gp.Name(), validJobFn)
preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
var victims []*api.TaskInfo
jobOccupiedMap := map[api.JobID]int32{}
for _, preemptee := range preemptees {
job := ssn.Jobs[preemptee.Job]
if _, found := jobOccupiedMap[job.UID]; !found {
jobOccupiedMap[job.UID] = job.ReadyTaskNum()
}
if jobOccupiedMap[job.UID] > job.MinAvailable {
jobOccupiedMap[job.UID]--
victims = append(victims, preemptee)
} else {
klog.V(4).Infof("Can not preempt task <%v/%v> because job %s ready num(%d) <= MinAvailable(%d) for gang-scheduling",
preemptee.Namespace, preemptee.Name, job.Name, jobOccupiedMap[job.UID], job.MinAvailable)
}
}
klog.V(4).Infof("Victims from Gang plugins are %+v", victims)
return victims, util.Permit
}
// TODO(k82cn): Support preempt/reclaim batch job.
ssn.AddReclaimableFn(gp.Name(), preemptableFn)
ssn.AddPreemptableFn(gp.Name(), preemptableFn)
jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
lReady := lv.Ready()
rReady := rv.Ready()
klog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t",
lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady)
if lReady && rReady {
return 0
}
if lReady {
return 1
}
if rReady {
return -1
}
return 0
}
ssn.AddJobOrderFn(gp.Name(), jobOrderFn)
ssn.AddJobReadyFn(gp.Name(), func(obj interface{}) bool {
ji := obj.(*api.JobInfo)
return ji.Ready()
})
pipelinedFn := func(obj interface{}) int {
ji := obj.(*api.JobInfo)
occupied := ji.WaitingTaskNum() + ji.ReadyTaskNum()
if occupied >= ji.MinAvailable {
return util.Permit
}
return util.Reject
}
ssn.AddJobPipelinedFn(gp.Name(), pipelinedFn)
jobStarvingFn := func(obj interface{}) bool {
ji := obj.(*api.JobInfo)
occupied := ji.WaitingTaskNum() + ji.ReadyTaskNum()
return occupied < ji.MinAvailable
}
ssn.AddJobStarvingFns(gp.Name(), jobStarvingFn)
}