pkg/controllers/apis/job_info.go (92 lines of code) (raw):
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apis
import (
"fmt"
v1 "k8s.io/api/core/v1"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
)
//JobInfo struct.
type JobInfo struct {
Namespace string
Name string
Job *batch.Job
Pods map[string]map[string]*v1.Pod
}
//Clone function clones the k8s pod values to the JobInfo struct.
func (ji *JobInfo) Clone() *JobInfo {
job := &JobInfo{
Namespace: ji.Namespace,
Name: ji.Name,
Job: ji.Job,
Pods: make(map[string]map[string]*v1.Pod),
}
for key, pods := range ji.Pods {
job.Pods[key] = make(map[string]*v1.Pod)
for pn, pod := range pods {
job.Pods[key][pn] = pod
}
}
return job
}
//SetJob sets the volcano jobs values to the JobInfo struct.
func (ji *JobInfo) SetJob(job *batch.Job) {
ji.Name = job.Name
ji.Namespace = job.Namespace
ji.Job = job
}
//AddPod adds the k8s pod object values to the Pods field
//of JobStruct if it doesn't exist. Otherwise it throws error.
func (ji *JobInfo) AddPod(pod *v1.Pod) error {
taskName, found := pod.Annotations[batch.TaskSpecKey]
if !found {
return fmt.Errorf("failed to find taskName of Pod <%s/%s>",
pod.Namespace, pod.Name)
}
_, found = pod.Annotations[batch.JobVersion]
if !found {
return fmt.Errorf("failed to find jobVersion of Pod <%s/%s>",
pod.Namespace, pod.Name)
}
if _, found := ji.Pods[taskName]; !found {
ji.Pods[taskName] = make(map[string]*v1.Pod)
}
if _, found := ji.Pods[taskName][pod.Name]; found {
return fmt.Errorf("duplicated pod")
}
ji.Pods[taskName][pod.Name] = pod
return nil
}
//UpdatePod updates the k8s pod object values to the existing pod.
func (ji *JobInfo) UpdatePod(pod *v1.Pod) error {
taskName, found := pod.Annotations[batch.TaskSpecKey]
if !found {
return fmt.Errorf("failed to find taskName of Pod <%s/%s>",
pod.Namespace, pod.Name)
}
_, found = pod.Annotations[batch.JobVersion]
if !found {
return fmt.Errorf("failed to find jobVersion of Pod <%s/%s>",
pod.Namespace, pod.Name)
}
if _, found := ji.Pods[taskName]; !found {
return fmt.Errorf("can not find task %s in cache", taskName)
}
if _, found := ji.Pods[taskName][pod.Name]; !found {
return fmt.Errorf("can not find pod <%s/%s> in cache",
pod.Namespace, pod.Name)
}
ji.Pods[taskName][pod.Name] = pod
return nil
}
//DeletePod deletes the given k8s pod from the JobInfo struct.
func (ji *JobInfo) DeletePod(pod *v1.Pod) error {
taskName, found := pod.Annotations[batch.TaskSpecKey]
if !found {
return fmt.Errorf("failed to find taskName of Pod <%s/%s>",
pod.Namespace, pod.Name)
}
_, found = pod.Annotations[batch.JobVersion]
if !found {
return fmt.Errorf("failed to find jobVersion of Pod <%s/%s>",
pod.Namespace, pod.Name)
}
if pods, found := ji.Pods[taskName]; found {
delete(pods, pod.Name)
if len(pods) == 0 {
delete(ji.Pods, taskName)
}
}
return nil
}