pkg/scheduler/framework/session.go (393 lines of code) (raw):
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
)
// Session information for the current session
type Session struct {
UID types.UID
kubeClient kubernetes.Interface
recorder record.EventRecorder
cache cache.Cache
informerFactory informers.SharedInformerFactory
TotalResource *api.Resource
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus
Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
RevocableNodes map[string]*api.NodeInfo
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo
Tiers []conf.Tier
Configurations []conf.Configuration
NodeList []*api.NodeInfo
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
namespaceOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
}
func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
recorder: cache.EventRecorder(),
cache: cache,
informerFactory: cache.SharedInformerFactory(),
TotalResource: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
namespaceOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
}
snapshot := cache.Snapshot()
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
// only conditions will be updated periodically
if job.PodGroup != nil && job.PodGroup.Status.Conditions != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
}
if vjr := ssn.JobValid(job); vjr != nil {
if !vjr.Pass {
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: vjr.Reason,
Message: vjr.Message,
}
if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
klog.Errorf("Failed to update job condition: %v", err)
}
}
delete(ssn.Jobs, job.UID)
}
}
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.Nodes = snapshot.Nodes
ssn.RevocableNodes = snapshot.RevocableNodes
ssn.Queues = snapshot.Queues
ssn.NamespaceInfo = snapshot.NamespaceInfo
// calculate all nodes' resource only once in each schedule cycle, other plugins can clone it when need
for _, n := range ssn.Nodes {
ssn.TotalResource.Add(n.Allocatable)
}
klog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues",
ssn.UID, len(ssn.Jobs), len(ssn.Queues))
return ssn
}
func closeSession(ssn *Session) {
ju := newJobUpdater(ssn)
ju.UpdateAll()
ssn.Jobs = nil
ssn.Nodes = nil
ssn.RevocableNodes = nil
ssn.plugins = nil
ssn.eventHandlers = nil
ssn.jobOrderFns = nil
ssn.namespaceOrderFns = nil
ssn.queueOrderFns = nil
ssn.clusterOrderFns = nil
ssn.NodeList = nil
ssn.TotalResource = nil
klog.V(3).Infof("Close Session %v", ssn.UID)
}
func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus {
status := jobInfo.PodGroup.Status
unschedulable := false
for _, c := range status.Conditions {
if c.Type == scheduling.PodGroupUnschedulableType &&
c.Status == v1.ConditionTrue &&
c.TransitionID == string(ssn.UID) {
unschedulable = true
break
}
}
// If running tasks && unschedulable, unknown phase
if len(jobInfo.TaskStatusIndex[api.Running]) != 0 && unschedulable {
status.Phase = scheduling.PodGroupUnknown
} else {
allocated := 0
for status, tasks := range jobInfo.TaskStatusIndex {
if api.AllocatedStatus(status) || status == api.Succeeded {
allocated += len(tasks)
}
}
// If there're enough allocated resource, it's running
if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember {
status.Phase = scheduling.PodGroupRunning
} else if jobInfo.PodGroup.Status.Phase != scheduling.PodGroupInqueue {
status.Phase = scheduling.PodGroupPending
}
}
status.Running = int32(len(jobInfo.TaskStatusIndex[api.Running]))
status.Failed = int32(len(jobInfo.TaskStatusIndex[api.Failed]))
status.Succeeded = int32(len(jobInfo.TaskStatusIndex[api.Succeeded]))
return status
}
// Statement returns new statement object
func (ssn *Session) Statement() *Statement {
return &Statement{
ssn: ssn,
}
}
// Pipeline the task to the node in the session
func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {
// Only update status in session
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Pipelined, ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, ssn.UID)
return fmt.Errorf("failed to find job %s when binding", task.Job)
}
task.NodeName = hostname
if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, ssn.UID, err)
return err
}
klog.V(3).Infof("After added Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
hostname, ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}
for _, eh := range ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}
return nil
}
//Allocate the task to the node in the session
func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) {
podVolumes, err := ssn.cache.GetPodVolumes(task, nodeInfo.Node)
if err != nil {
return err
}
hostname := nodeInfo.Name
if err := ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil {
return err
}
defer func() {
if err != nil {
ssn.cache.RevertVolumes(task, podVolumes)
}
}()
task.Pod.Spec.NodeName = hostname
task.PodVolumes = podVolumes
// Only update status in session
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
task.NodeName = hostname
if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, ssn.UID, err)
return err
}
klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
hostname, ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}
// Callbacks
for _, eh := range ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}
if ssn.JobReady(job) {
for _, task := range job.TaskStatusIndex[api.Allocated] {
if err := ssn.dispatch(task); err != nil {
klog.Errorf("Failed to dispatch task <%v/%v>: %v",
task.Namespace, task.Name, err)
return err
}
}
} else {
ssn.cache.RevertVolumes(task, podVolumes)
}
return nil
}
func (ssn *Session) dispatch(task *api.TaskInfo) error {
if err := ssn.cache.AddBindTask(task); err != nil {
return err
}
// Update status in session
if job, found := ssn.Jobs[task.Job]; found {
if err := job.UpdateTaskStatus(task, api.Binding); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Binding, ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}
//Evict the task in the session
func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {
if err := ssn.cache.Evict(reclaimee, reason); err != nil {
return err
}
// Update status in session
job, found := ssn.Jobs[reclaimee.Job]
if found {
if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, api.Releasing, ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
reclaimee.Job, ssn.UID)
return fmt.Errorf("failed to find job %s", reclaimee.Job)
}
// Update task in node.
if node, found := ssn.Nodes[reclaimee.NodeName]; found {
if err := node.UpdateTask(reclaimee); err != nil {
klog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, ssn.UID, err)
return err
}
}
for _, eh := range ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
Task: reclaimee,
})
}
}
return nil
}
// BindPodGroup bind PodGroup to specified cluster
func (ssn *Session) BindPodGroup(job *api.JobInfo, cluster string) error {
return ssn.cache.BindPodGroup(job, cluster)
}
// UpdatePodGroupCondition update job condition accordingly.
func (ssn *Session) UpdatePodGroupCondition(jobInfo *api.JobInfo, cond *scheduling.PodGroupCondition) error {
job, ok := ssn.Jobs[jobInfo.UID]
if !ok {
return fmt.Errorf("failed to find job <%s/%s>", jobInfo.Namespace, jobInfo.Name)
}
index := -1
for i, c := range job.PodGroup.Status.Conditions {
if c.Type == cond.Type {
index = i
break
}
}
// Update condition to the new condition.
if index < 0 {
job.PodGroup.Status.Conditions = append(job.PodGroup.Status.Conditions, *cond)
} else {
job.PodGroup.Status.Conditions[index] = *cond
}
return nil
}
// AddEventHandler add event handlers
func (ssn *Session) AddEventHandler(eh *EventHandler) {
ssn.eventHandlers = append(ssn.eventHandlers, eh)
}
// UpdateSchedulerNumaInfo update SchedulerNumaInfo
func (ssn *Session) UpdateSchedulerNumaInfo(AllocatedSets map[string]api.ResNumaSets) {
ssn.cache.UpdateSchedulerNumaInfo(AllocatedSets)
}
// KubeClient returns the kubernetes client
func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
}
// InformerFactory returns the scheduler ShareInformerFactory
func (ssn Session) InformerFactory() informers.SharedInformerFactory {
return ssn.informerFactory
}
// RecordPodGroupEvent records podGroup events
func (ssn Session) RecordPodGroupEvent(podGroup *schedulingapi.PodGroup, eventType, reason, msg string) {
if podGroup == nil {
return
}
pg := &vcv1beta1.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&podGroup.PodGroup, pg, nil); err != nil {
klog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err)
return
}
ssn.recorder.Eventf(pg, eventType, reason, msg)
}
//String return nodes and jobs information in the session
func (ssn Session) String() string {
msg := fmt.Sprintf("Session %v: \n", ssn.UID)
for _, job := range ssn.Jobs {
msg = fmt.Sprintf("%s%v\n", msg, job)
}
for _, node := range ssn.Nodes {
msg = fmt.Sprintf("%s%v\n", msg, node)
}
return msg
}