pkg/scheduler/framework/job_updater.go (82 lines of code) (raw):
package framework
import (
"context"
"math/rand"
"reflect"
"time"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
)
const (
jobUpdaterWorker = 16
jobConditionUpdateTime = time.Minute
jobConditionUpdateTimeJitter = 30 * time.Second
)
// TimeJitterAfter means: new after old + duration + jitter
func TimeJitterAfter(new, old time.Time, duration, maxJitter time.Duration) bool {
var jitter int64
if maxJitter > 0 {
jitter = rand.Int63n(int64(maxJitter))
}
return new.After(old.Add(duration + time.Duration(jitter)))
}
type jobUpdater struct {
ssn *Session
jobQueue []*api.JobInfo
}
func newJobUpdater(ssn *Session) *jobUpdater {
queue := make([]*api.JobInfo, 0, len(ssn.Jobs))
for _, job := range ssn.Jobs {
queue = append(queue, job)
}
ju := &jobUpdater{
ssn: ssn,
jobQueue: queue,
}
return ju
}
func (ju *jobUpdater) UpdateAll() {
workqueue.ParallelizeUntil(context.TODO(), jobUpdaterWorker, len(ju.jobQueue), ju.updateJob)
}
func isPodGroupConditionsUpdated(newCondition, oldCondition []scheduling.PodGroupCondition) bool {
if len(newCondition) != len(oldCondition) {
return true
}
for index, newCond := range newCondition {
oldCond := oldCondition[index]
newTime := newCond.LastTransitionTime
oldTime := oldCond.LastTransitionTime
if TimeJitterAfter(newTime.Time, oldTime.Time, jobConditionUpdateTime, jobConditionUpdateTimeJitter) {
return true
}
// if newCond is not new enough, we treat it the same as the old one
newCond.LastTransitionTime = oldTime
// comparing should ignore the TransitionID
newTransitionID := newCond.TransitionID
newCond.TransitionID = oldCond.TransitionID
shouldUpdate := !reflect.DeepEqual(&newCond, &oldCond)
newCond.LastTransitionTime = newTime
newCond.TransitionID = newTransitionID
if shouldUpdate {
return true
}
}
return false
}
func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) bool {
newCondition := newStatus.Conditions
newStatus.Conditions = nil
oldCondition := oldStatus.Conditions
oldStatus.Conditions = nil
return !reflect.DeepEqual(newStatus, oldStatus) || isPodGroupConditionsUpdated(newCondition, oldCondition)
}
// updateJob update specified job
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn
job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
}
}