pkg/scheduler/scheduler.go (132 lines of code) (raw):

/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package scheduler import ( "fmt" "path/filepath" "sync" "time" "github.com/fsnotify/fsnotify" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/klog" "volcano.sh/volcano/pkg/filewatcher" schedcache "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/metrics" ) // Scheduler watches for new unscheduled pods for volcano. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { cache schedcache.Cache schedulerConf string fileWatcher filewatcher.FileWatcher schedulePeriod time.Duration once sync.Once mutex sync.Mutex actions []framework.Action plugins []conf.Tier configurations []conf.Configuration } // NewScheduler returns a scheduler func NewScheduler( config *rest.Config, schedulerName string, schedulerConf string, period time.Duration, defaultQueue string, nodeSelectors []string, ) (*Scheduler, error) { var watcher filewatcher.FileWatcher if schedulerConf != "" { var err error path := filepath.Dir(schedulerConf) watcher, err = filewatcher.NewFileWatcher(path) if err != nil { return nil, fmt.Errorf("failed creating filewatcher for %s: %v", schedulerConf, err) } } scheduler := &Scheduler{ schedulerConf: schedulerConf, fileWatcher: watcher, cache: schedcache.New(config, schedulerName, defaultQueue, nodeSelectors), schedulePeriod: period, } return scheduler, nil } // Run runs the Scheduler func (pc *Scheduler) Run(stopCh <-chan struct{}) { pc.loadSchedulerConf() go pc.watchSchedulerConf(stopCh) // Start cache for policy. pc.cache.Run(stopCh) pc.cache.WaitForCacheSync(stopCh) klog.V(2).Infof("scheduler completes Initialization and start to run") go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh) } func (pc *Scheduler) runOnce() { klog.V(4).Infof("Start scheduling ...") scheduleStartTime := time.Now() defer klog.V(4).Infof("End scheduling ...") pc.mutex.Lock() actions := pc.actions plugins := pc.plugins configurations := pc.configurations pc.mutex.Unlock() ssn := framework.OpenSession(pc.cache, plugins, configurations) defer framework.CloseSession(ssn) for _, action := range actions { actionStartTime := time.Now() action.Execute(ssn) metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime)) } metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime)) } func (pc *Scheduler) loadSchedulerConf() { var err error pc.once.Do(func() { pc.actions, pc.plugins, pc.configurations, err = unmarshalSchedulerConf(defaultSchedulerConf) if err != nil { klog.Errorf("unmarshal scheduler config %s failed: %v", defaultSchedulerConf, err) panic("invalid default configuration") } }) var config string if len(pc.schedulerConf) != 0 { if config, err = readSchedulerConf(pc.schedulerConf); err != nil { klog.Errorf("Failed to read scheduler configuration '%s', using previous configuration: %v", pc.schedulerConf, err) return } } actions, plugins, configurations, err := unmarshalSchedulerConf(config) if err != nil { klog.Errorf("scheduler config %s is invalid: %v", config, err) return } pc.mutex.Lock() // If it is valid, use the new configuration pc.actions = actions pc.plugins = plugins pc.configurations = configurations pc.mutex.Unlock() } func (pc *Scheduler) watchSchedulerConf(stopCh <-chan struct{}) { if pc.fileWatcher == nil { return } eventCh := pc.fileWatcher.Events() errCh := pc.fileWatcher.Errors() for { select { case event, ok := <-eventCh: if !ok { return } klog.V(4).Infof("watch %s event: %v", pc.schedulerConf, event) if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { pc.loadSchedulerConf() } case err, ok := <-errCh: if !ok { return } klog.Infof("watch %s error: %v", pc.schedulerConf, err) case <-stopCh: return } } }