pkg/controller/jobs/rayjob/rayjob_controller.go (207 lines of code) (raw):

/* Copyright 2023 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package rayjob import ( "context" "strings" rayjobapi "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/jobframework" ) var ( gvk = rayjobapi.GroupVersion.WithKind("RayJob") FrameworkName = "ray.io/rayjob" ) const ( headGroupPodSetName = "head" ) func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ SetupIndexes: SetupIndexes, NewReconciler: NewReconciler, SetupWebhook: SetupRayJobWebhook, JobType: &rayjobapi.RayJob{}, AddToScheme: rayjobapi.AddToScheme, })) } // RayJobReconciler reconciles a RayJob object type RayJobReconciler jobframework.JobReconciler func NewReconciler( scheme *runtime.Scheme, client client.Client, record record.EventRecorder, opts ...jobframework.Option) jobframework.JobReconcilerInterface { return (*RayJobReconciler)(jobframework.NewReconciler(scheme, client, record, opts..., )) } type RayJob rayjobapi.RayJob var _ jobframework.GenericJob = (*RayJob)(nil) func (j *RayJob) Object() client.Object { return (*rayjobapi.RayJob)(j) } func (j *RayJob) IsSuspended() bool { return j.Spec.Suspend } func (j *RayJob) IsActive() bool { return j.Status.JobDeploymentStatus != rayjobapi.JobDeploymentStatusSuspended } func (j *RayJob) Suspend() { j.Spec.Suspend = true } func (j *RayJob) ResetStatus() bool { if j.Status.StartTime == nil { return false } j.Status.StartTime = nil return true } func (j *RayJob) GetGVK() schema.GroupVersionKind { return gvk } func (j *RayJob) PodSets() []kueue.PodSet { // len = workerGroups + head podSets := make([]kueue.PodSet, len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1) // head podSets[0] = kueue.PodSet{ Name: headGroupPodSetName, Template: *j.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(), Count: 1, } // workers for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs { wgs := &j.Spec.RayClusterSpec.WorkerGroupSpecs[index] replicas := int32(1) if wgs.Replicas != nil { replicas = *wgs.Replicas } podSets[index+1] = kueue.PodSet{ Name: strings.ToLower(wgs.GroupName), Template: *wgs.Template.DeepCopy(), Count: replicas, } } return podSets } func applySelectors(dst, src map[string]string) map[string]string { if len(dst) == 0 { return src } for k, v := range src { dst[k] = v } return dst } func (j *RayJob) RunWithNodeAffinity(nodeSelectors []jobframework.PodSetNodeSelector) { j.Spec.Suspend = false if len(nodeSelectors) != len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1 { return } // head headPodSpec := &j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec headPodSpec.NodeSelector = applySelectors(headPodSpec.NodeSelector, nodeSelectors[0].NodeSelector) // workers for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs { workerPodSpec := &j.Spec.RayClusterSpec.WorkerGroupSpecs[index].Template.Spec workerPodSpec.NodeSelector = applySelectors(workerPodSpec.NodeSelector, nodeSelectors[index+1].NodeSelector) } } func cloneSelectors(src map[string]string) map[string]string { dst := make(map[string]string, len(src)) for k, v := range src { dst[k] = v } return dst } func (j *RayJob) RestoreNodeAffinity(nodeSelectors []jobframework.PodSetNodeSelector) { if len(nodeSelectors) != len(j.Spec.RayClusterSpec.WorkerGroupSpecs)+1 { return } // head headPodSpec := &j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec if !equality.Semantic.DeepEqual(headPodSpec.NodeSelector, nodeSelectors[0].NodeSelector) { headPodSpec.NodeSelector = cloneSelectors(nodeSelectors[0].NodeSelector) } // workers for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs { workerPodSpec := &j.Spec.RayClusterSpec.WorkerGroupSpecs[index].Template.Spec if !equality.Semantic.DeepEqual(workerPodSpec.NodeSelector, nodeSelectors[index+1].NodeSelector) { workerPodSpec.NodeSelector = cloneSelectors(nodeSelectors[index+1].NodeSelector) } } } func (j *RayJob) Finished() (metav1.Condition, bool) { condition := metav1.Condition{ Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: string(j.Status.JobStatus), Message: j.Status.Message, } return condition, rayjobapi.IsJobTerminal(j.Status.JobStatus) } func (j *RayJob) EquivalentToWorkload(wl kueue.Workload) bool { if len(wl.Spec.PodSets) != (len(j.Spec.RayClusterSpec.WorkerGroupSpecs) + 1) { return false } podSets := wl.Spec.PodSets headPodSpec := &j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec if podSets[0].Count != 1 { return false } if !equality.Semantic.DeepEqual(headPodSpec.InitContainers, podSets[0].Template.Spec.InitContainers) { return false } if !equality.Semantic.DeepEqual(headPodSpec.Containers, podSets[0].Template.Spec.Containers) { return false } // workers for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs { // MinReplicas and MaxReplicas should be taken into account for partial admission. if podSets[index+1].Count != *j.Spec.RayClusterSpec.WorkerGroupSpecs[index].Replicas { return false } workerPodSpec := &j.Spec.RayClusterSpec.WorkerGroupSpecs[index].Template.Spec if !equality.Semantic.DeepEqual(workerPodSpec.InitContainers, podSets[index+1].Template.Spec.InitContainers) { return false } if !equality.Semantic.DeepEqual(workerPodSpec.Containers, podSets[index+1].Template.Spec.Containers) { return false } } return true } func (j *RayJob) PriorityClass() string { if j.Spec.RayClusterSpec != nil { rcs := j.Spec.RayClusterSpec if len(rcs.HeadGroupSpec.Template.Spec.PriorityClassName) != 0 { return rcs.HeadGroupSpec.Template.Spec.PriorityClassName } for wi := range rcs.WorkerGroupSpecs { w := &rcs.WorkerGroupSpecs[wi] if len(w.Template.Spec.PriorityClassName) != 0 { return w.Template.Spec.PriorityClassName } } } return "" } func (j *RayJob) PodsReady() bool { return j.Status.RayClusterStatus.State == rayjobapi.Ready } func (j *RayJob) ReclaimablePods() []kueue.ReclaimablePod { return nil } // SetupWithManager sets up the controller with the Manager. It indexes workloads // based on the owning jobs. func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&rayjobapi.RayJob{}). Owns(&kueue.Workload{}). Complete(r) } func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk) } //+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update //+kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=get;list;watch;update;patch //+kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch func (r *RayJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { fjr := (*jobframework.JobReconciler)(r) return fjr.ReconcileGenericJob(ctx, req, &RayJob{}) } func GetWorkloadNameForRayJob(jobName string) string { return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, gvk) }