in pkg/scheduler/framework/session.go [93:178]
func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
recorder: cache.EventRecorder(),
cache: cache,
informerFactory: cache.SharedInformerFactory(),
TotalResource: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
namespaceOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
}
snapshot := cache.Snapshot()
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
// only conditions will be updated periodically
if job.PodGroup != nil && job.PodGroup.Status.Conditions != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
}
if vjr := ssn.JobValid(job); vjr != nil {
if !vjr.Pass {
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: vjr.Reason,
Message: vjr.Message,
}
if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
klog.Errorf("Failed to update job condition: %v", err)
}
}
delete(ssn.Jobs, job.UID)
}
}
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.Nodes = snapshot.Nodes
ssn.RevocableNodes = snapshot.RevocableNodes
ssn.Queues = snapshot.Queues
ssn.NamespaceInfo = snapshot.NamespaceInfo
// calculate all nodes' resource only once in each schedule cycle, other plugins can clone it when need
for _, n := range ssn.Nodes {
ssn.TotalResource.Add(n.Allocatable)
}
klog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues",
ssn.UID, len(ssn.Jobs), len(ssn.Queues))
return ssn
}