pkg/scheduler/scheduler.go (132 lines of code) (raw):
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"path/filepath"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/klog"
"volcano.sh/volcano/pkg/filewatcher"
schedcache "volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/metrics"
)
// Scheduler watches for new unscheduled pods for volcano. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
cache schedcache.Cache
schedulerConf string
fileWatcher filewatcher.FileWatcher
schedulePeriod time.Duration
once sync.Once
mutex sync.Mutex
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
}
// NewScheduler returns a scheduler
func NewScheduler(
config *rest.Config,
schedulerName string,
schedulerConf string,
period time.Duration,
defaultQueue string,
nodeSelectors []string,
) (*Scheduler, error) {
var watcher filewatcher.FileWatcher
if schedulerConf != "" {
var err error
path := filepath.Dir(schedulerConf)
watcher, err = filewatcher.NewFileWatcher(path)
if err != nil {
return nil, fmt.Errorf("failed creating filewatcher for %s: %v", schedulerConf, err)
}
}
scheduler := &Scheduler{
schedulerConf: schedulerConf,
fileWatcher: watcher,
cache: schedcache.New(config, schedulerName, defaultQueue, nodeSelectors),
schedulePeriod: period,
}
return scheduler, nil
}
// Run runs the Scheduler
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
pc.loadSchedulerConf()
go pc.watchSchedulerConf(stopCh)
// Start cache for policy.
pc.cache.Run(stopCh)
pc.cache.WaitForCacheSync(stopCh)
klog.V(2).Infof("scheduler completes Initialization and start to run")
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
}
func (pc *Scheduler) runOnce() {
klog.V(4).Infof("Start scheduling ...")
scheduleStartTime := time.Now()
defer klog.V(4).Infof("End scheduling ...")
pc.mutex.Lock()
actions := pc.actions
plugins := pc.plugins
configurations := pc.configurations
pc.mutex.Unlock()
ssn := framework.OpenSession(pc.cache, plugins, configurations)
defer framework.CloseSession(ssn)
for _, action := range actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
}
func (pc *Scheduler) loadSchedulerConf() {
var err error
pc.once.Do(func() {
pc.actions, pc.plugins, pc.configurations, err = unmarshalSchedulerConf(defaultSchedulerConf)
if err != nil {
klog.Errorf("unmarshal scheduler config %s failed: %v", defaultSchedulerConf, err)
panic("invalid default configuration")
}
})
var config string
if len(pc.schedulerConf) != 0 {
if config, err = readSchedulerConf(pc.schedulerConf); err != nil {
klog.Errorf("Failed to read scheduler configuration '%s', using previous configuration: %v",
pc.schedulerConf, err)
return
}
}
actions, plugins, configurations, err := unmarshalSchedulerConf(config)
if err != nil {
klog.Errorf("scheduler config %s is invalid: %v", config, err)
return
}
pc.mutex.Lock()
// If it is valid, use the new configuration
pc.actions = actions
pc.plugins = plugins
pc.configurations = configurations
pc.mutex.Unlock()
}
func (pc *Scheduler) watchSchedulerConf(stopCh <-chan struct{}) {
if pc.fileWatcher == nil {
return
}
eventCh := pc.fileWatcher.Events()
errCh := pc.fileWatcher.Errors()
for {
select {
case event, ok := <-eventCh:
if !ok {
return
}
klog.V(4).Infof("watch %s event: %v", pc.schedulerConf, event)
if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create {
pc.loadSchedulerConf()
}
case err, ok := <-errCh:
if !ok {
return
}
klog.Infof("watch %s error: %v", pc.schedulerConf, err)
case <-stopCh:
return
}
}
}