pkg/controllers/job/job_controller.go (261 lines of code) (raw):
/*
Copyright 2017 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"hash"
"hash/fnv"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
kubeschedulinginformers "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
kubeschedulinglisters "k8s.io/client-go/listers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
batchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
vcscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"
informerfactory "volcano.sh/apis/pkg/client/informers/externalversions"
batchinformer "volcano.sh/apis/pkg/client/informers/externalversions/batch/v1alpha1"
businformer "volcano.sh/apis/pkg/client/informers/externalversions/bus/v1alpha1"
schedulinginformers "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
batchlister "volcano.sh/apis/pkg/client/listers/batch/v1alpha1"
buslister "volcano.sh/apis/pkg/client/listers/bus/v1alpha1"
schedulinglisters "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobcache "volcano.sh/volcano/pkg/controllers/cache"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/controllers/job/state"
)
func init() {
framework.RegisterController(&jobcontroller{})
}
// jobcontroller the Job jobcontroller type.
type jobcontroller struct {
kubeClient kubernetes.Interface
vcClient vcclientset.Interface
jobInformer batchinformer.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer schedulinginformers.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer businformer.CommandInformer
pcInformer kubeschedulinginformers.PriorityClassInformer
queueInformer schedulinginformers.QueueInformer
// A store of jobs
jobLister batchlister.JobLister
jobSynced func() bool
// A store of pods
podLister corelisters.PodLister
podSynced func() bool
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced func() bool
// A store of podgroups
pgLister schedulinglisters.PodGroupLister
pgSynced func() bool
// A store of service
svcLister corelisters.ServiceLister
svcSynced func() bool
cmdLister buslister.CommandLister
cmdSynced func() bool
pcLister kubeschedulinglisters.PriorityClassLister
pcSynced func() bool
queueLister schedulinglisters.QueueLister
queueSynced func() bool
// queue that need to sync up
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
cache jobcache.Cache
// Job Event recorder
recorder record.EventRecorder
errTasks workqueue.RateLimitingInterface
workers uint32
maxRequeueNum int
}
func (cc *jobcontroller) Name() string {
return "job-controller"
}
// Initialize creates the new Job job controller.
func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.kubeClient = opt.KubeClient
cc.vcClient = opt.VolcanoClient
sharedInformers := opt.SharedInformerFactory
workers := opt.WorkerNum
// Initialize event client
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
cc.queueList = make([]workqueue.RateLimitingInterface, workers)
cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
cc.cache = jobcache.New()
cc.errTasks = newRateLimitingQueue()
cc.recorder = recorder
cc.workers = workers
cc.maxRequeueNum = opt.MaxRequeueNum
if cc.maxRequeueNum < 0 {
cc.maxRequeueNum = -1
}
var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}
cc.jobInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Batch().V1alpha1().Jobs()
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addJob,
UpdateFunc: cc.updateJob,
DeleteFunc: cc.deleteJob,
})
cc.jobLister = cc.jobInformer.Lister()
cc.jobSynced = cc.jobInformer.Informer().HasSynced
cc.cmdInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *busv1alpha1.Command:
if v.TargetObject != nil &&
v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&
v.TargetObject.Kind == "Job" {
return true
}
return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
},
},
)
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
})
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced
cc.svcInformer = sharedInformers.Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced
cc.pgInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cc.updatePodGroup,
})
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced
cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses()
cc.pcLister = cc.pcInformer.Lister()
cc.pcSynced = cc.pcInformer.Informer().HasSynced
cc.queueInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().Queues()
cc.queueLister = cc.queueInformer.Lister()
cc.queueSynced = cc.queueInformer.Informer().HasSynced
// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
return nil
}
// Run start JobController.
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)
go cc.pcInformer.Informer().Run(stopCh)
go cc.queueInformer.Informer().Run(stopCh)
cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)
go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
}
go cc.cache.Run(stopCh)
// Re-sync error tasks.
go wait.Until(cc.processResyncTask, 0, stopCh)
klog.Infof("JobController is running ...... ")
}
func (cc *jobcontroller) worker(i uint32) {
klog.Infof("worker %d start ...... ", i)
for cc.processNextReq(i) {
}
}
func (cc *jobcontroller) belongsToThisRoutine(key string, count uint32) bool {
var hashVal hash.Hash32
var val uint32
hashVal = fnv.New32()
hashVal.Write([]byte(key))
val = hashVal.Sum32()
return val%cc.workers == count
}
func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
var hashVal hash.Hash32
var val uint32
hashVal = fnv.New32()
hashVal.Write([]byte(key))
val = hashVal.Sum32()
queue := cc.queueList[val%cc.workers]
return queue
}
func (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
req := obj.(apis.Request)
defer queue.Done(req)
key := jobcache.JobKeyByReq(&req)
if !cc.belongsToThisRoutine(key, count) {
klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}
klog.V(3).Infof("Try to handle request <%v>", req)
jobInfo, err := cc.cache.Get(key)
if err != nil {
// TODO(k82cn): ignore not-ready error.
klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return true
}
st := state.NewState(jobInfo)
if st == nil {
klog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return true
}
action := applyPolicies(jobInfo.Job, &req)
klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)
if action != busv1alpha1.SyncJobAction {
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Start to execute action %s ", action))
}
if err := st.Execute(action); err != nil {
if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
queue.AddRateLimited(req)
return true
}
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Job failed on action %s for retry limit reached", action))
klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name)
if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil {
klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
// If no error, forget it.
queue.Forget(req)
return true
}