pkg/scheduler/framework/statement.go (315 lines of code) (raw):
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"k8s.io/klog"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/metrics"
)
// Operation type
type Operation int8
const (
// Evict op
Evict = iota
// Pipeline op
Pipeline
// Allocate op
Allocate
)
type operation struct {
name Operation
task *api.TaskInfo
reason string
}
// Statement structure
type Statement struct {
operations []operation
ssn *Session
}
// NewStatement returns new statement object
func NewStatement(ssn *Session) *Statement {
return &Statement{
ssn: ssn,
}
}
// Evict the pod
func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error {
// Update status in session
if job, found := s.ssn.Jobs[reclaimee.Job]; found {
if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, api.Releasing, s.ssn.UID, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
reclaimee.Job, s.ssn.UID)
}
// Update task in node.
if node, found := s.ssn.Nodes[reclaimee.NodeName]; found {
err := node.UpdateTask(reclaimee)
if err != nil {
klog.Errorf("Failed to update task <%v/%v> in node %v for: %s",
reclaimee.Namespace, reclaimee.Name, reclaimee.NodeName, err.Error())
return err
}
}
for _, eh := range s.ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
Task: reclaimee,
})
}
}
s.operations = append(s.operations, operation{
name: Evict,
task: reclaimee,
reason: reason,
})
return nil
}
func (s *Statement) evict(reclaimee *api.TaskInfo, reason string) error {
if err := s.ssn.cache.Evict(reclaimee, reason); err != nil {
if e := s.unevict(reclaimee); e != nil {
klog.Errorf("Faled to unevict task <%v/%v>: %v.",
reclaimee.Namespace, reclaimee.Name, e)
}
return err
}
return nil
}
func (s *Statement) unevict(reclaimee *api.TaskInfo) error {
// Update status in session
job, found := s.ssn.Jobs[reclaimee.Job]
if found {
if err := job.UpdateTaskStatus(reclaimee, api.Running); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, api.Releasing, s.ssn.UID, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
reclaimee.Job, s.ssn.UID)
}
// Update task in node.
if node, found := s.ssn.Nodes[reclaimee.NodeName]; found {
err := node.UpdateTask(reclaimee)
if err != nil {
klog.Errorf("Failed to update task <%v/%v> in node %v for: %s",
reclaimee.Namespace, reclaimee.Name, reclaimee.NodeName, err.Error())
return err
}
}
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: reclaimee,
})
}
}
return nil
}
// Pipeline the task for the node
func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Pipelined, s.ssn.UID, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
}
task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
}
klog.V(3).Infof("After pipelined Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
hostname, s.ssn.UID)
}
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}
s.operations = append(s.operations, operation{
name: Pipeline,
task: task,
})
return nil
}
func (s *Statement) pipeline(task *api.TaskInfo) {
}
func (s *Statement) unpipeline(task *api.TaskInfo) error {
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Pipelined, s.ssn.UID, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
}
if node, found := s.ssn.Nodes[task.NodeName]; found {
if err := node.RemoveTask(task); err != nil {
klog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, task.NodeName, s.ssn.UID, err)
}
klog.V(3).Infof("After pipelined Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
task.NodeName, s.ssn.UID)
}
for _, eh := range s.ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
Task: task,
})
}
}
task.NodeName = ""
return nil
}
// Allocate the task to node
func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) {
podVolumes, err := s.ssn.cache.GetPodVolumes(task, nodeInfo.Node)
if err != nil {
return err
}
hostname := nodeInfo.Name
if err := s.ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil {
return err
}
defer func() {
if err != nil {
s.ssn.cache.RevertVolumes(task, podVolumes)
}
}()
task.Pod.Spec.NodeName = hostname
task.PodVolumes = podVolumes
// Only update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, s.ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
return err
}
klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when binding.",
hostname, s.ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}
// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}
// Update status in session
klog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: Allocate,
task: task,
})
return nil
}
func (s *Statement) allocate(task *api.TaskInfo) error {
if err := s.ssn.cache.AddBindTask(task); err != nil {
return err
}
if job, found := s.ssn.Jobs[task.Job]; found {
if err := job.UpdateTaskStatus(task, api.Binding); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Binding, s.ssn.UID, err)
return err
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}
// unallocate the pod for task
func (s *Statement) unallocate(task *api.TaskInfo) error {
s.ssn.cache.RevertVolumes(task, task.PodVolumes)
// Update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, s.ssn.UID, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when unallocating.",
task.Job, s.ssn.UID)
}
if node, found := s.ssn.Nodes[task.NodeName]; found {
klog.V(3).Infof("Remove Task <%v> on node <%v>", task.Name, task.NodeName)
err := node.RemoveTask(task)
if err != nil {
klog.Errorf("Failed to remove Task <%v> on node <%v>: %s", task.Name, task.NodeName, err.Error())
}
}
for _, eh := range s.ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
Task: task,
})
}
}
task.NodeName = ""
return nil
}
// Discard operation for evict, pipeline and allocate
func (s *Statement) Discard() {
klog.V(3).Info("Discarding operations ...")
for i := len(s.operations) - 1; i >= 0; i-- {
op := s.operations[i]
op.task.GenerateLastTxContext()
switch op.name {
case Evict:
err := s.unevict(op.task)
if err != nil {
klog.Errorf("Failed to unevict task: %s", err.Error())
}
case Pipeline:
err := s.unpipeline(op.task)
if err != nil {
klog.Errorf("Failed to unpipeline task: %s", err.Error())
}
case Allocate:
err := s.unallocate(op.task)
if err != nil {
klog.Errorf("Failed to unallocate task: %s", err.Error())
}
}
}
}
// Commit operation for evict and pipeline
func (s *Statement) Commit() {
klog.V(3).Info("Committing operations ...")
for _, op := range s.operations {
op.task.ClearLastTxContext()
switch op.name {
case Evict:
err := s.evict(op.task, op.reason)
if err != nil {
klog.Errorf("Failed to evict task: %s", err.Error())
}
case Pipeline:
s.pipeline(op.task)
case Allocate:
err := s.allocate(op.task)
if err != nil {
s.ssn.cache.RevertVolumes(op.task, op.task.PodVolumes)
klog.Errorf("Failed to allocate task: for %s", err.Error())
}
}
}
}