diff --git a/docs/reference/api.md b/docs/reference/api.md index 8dfbc787625..f2732b22a22 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -356,7 +356,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.
A suspended worker group will have all pods deleted. | | | +| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.
A suspended worker group will have all pods deleted.
This is not a user-facing API and is only used by RayJob DeletionPolicy. | | | | `groupName` _string_ | we can have multiple worker groups, we distinguish them by name | | | | `replicas` _integer_ | Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional. | 0 | | | `minReplicas` _integer_ | MinReplicas denotes the minimum number of desired Pods for this worker group. | 0 | | diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index 9f032f24b7e..c3f086eb61c 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -72,6 +72,7 @@ type HeadGroupSpec struct { type WorkerGroupSpec struct { // Suspend indicates whether a worker group should be suspended. // A suspended worker group will have all pods deleted. + // This is not a user-facing API and is only used by RayJob DeletionPolicy. Suspend *bool `json:"suspend,omitempty"` // we can have multiple worker groups, we distinguish them by name GroupName string `json:"groupName"` diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 794177651e0..cef514674da 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -262,6 +262,14 @@ func validateRayClusterSpec(instance *rayv1.RayCluster) error { // TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.ExternalStorageNamespace` instead of // the annotation `ray.io/external-storage-namespace`. + if !features.Enabled(features.RayJobDeletionPolicy) { + for _, workerGroup := range instance.Spec.WorkerGroupSpecs { + if workerGroup.Suspend != nil && *workerGroup.Suspend { + return fmt.Errorf("suspending worker groups is currently available when the RayJobDeletionPolicy feature gate is enabled") + } + } + } + enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling) if enableInTreeAutoscaling { for _, workerGroup := range instance.Spec.WorkerGroupSpecs { diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index 47610f67f4a..2ad08c07059 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -828,6 +828,10 @@ var _ = Context("Inside the default namespace", func() { workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions() headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() + BeforeAll(func() { + DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)) + }) + It("Create a RayCluster custom resource", func() { err := k8sClient.Create(ctx, rayCluster) Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster") @@ -871,6 +875,10 @@ var _ = Context("Inside the default namespace", func() { workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions() headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() + BeforeAll(func() { + DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)) + }) + It("Create a RayCluster custom resource", func() { err := k8sClient.Create(ctx, rayCluster) Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster") diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 6beb74f9804..87d7a502f23 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3751,7 +3751,20 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) { name string errorMessage string expectError bool + featureGate bool }{ + { + name: "suspend without autoscaler and the feature gate", + rayCluster: &rayv1.RayCluster{ + Spec: rayv1.RayClusterSpec{ + HeadGroupSpec: headGroupSpec, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended}, + }, + }, + featureGate: false, + expectError: true, + errorMessage: "suspending worker groups is currently available when the RayJobDeletionPolicy feature gate is enabled", + }, { name: "suspend without autoscaler", rayCluster: &rayv1.RayCluster{ @@ -3760,6 +3773,7 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) { WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended}, }, }, + featureGate: true, expectError: false, }, { @@ -3772,6 +3786,7 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) { EnableInTreeAutoscaling: ptr.To[bool](true), }, }, + featureGate: true, expectError: true, errorMessage: "suspending worker groups is not currently supported with Autoscaler enabled", }, @@ -3779,6 +3794,7 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, tt.featureGate)() err := validateRayClusterSpec(tt.rayCluster) if tt.expectError { assert.Error(t, err) diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index 91e97d7695d..286c0ff476d 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -838,8 +838,6 @@ var _ = Context("RayJob with different submission modes", func() { }) Describe("RayJob with DeletionPolicy=DeleteCluster", Ordered, func() { - features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) - ctx := context.Background() namespace := "default" rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletecluster", namespace) @@ -848,6 +846,10 @@ var _ = Context("RayJob with different submission modes", func() { rayJob.Spec.ShutdownAfterJobFinishes = false rayCluster := &rayv1.RayCluster{} + BeforeAll(func() { + DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)) + }) + It("Verify RayJob spec", func() { Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteClusterDeletionPolicy)) }) @@ -960,8 +962,6 @@ var _ = Context("RayJob with different submission modes", func() { }) Describe("RayJob with DeletionPolicy=DeleteWorkers", Ordered, func() { - features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) - ctx := context.Background() namespace := "default" rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deleteworkers", namespace) @@ -970,6 +970,10 @@ var _ = Context("RayJob with different submission modes", func() { rayJob.Spec.ShutdownAfterJobFinishes = false rayCluster := &rayv1.RayCluster{} + BeforeAll(func() { + DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)) + }) + It("Verify RayJob spec", func() { Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteWorkersDeletionPolicy)) }) @@ -1107,6 +1111,10 @@ var _ = Context("RayJob with different submission modes", func() { rayJob.Spec.ShutdownAfterJobFinishes = false rayCluster := &rayv1.RayCluster{} + BeforeAll(func() { + DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)) + }) + It("Create a RayJob custom resource", func() { err := k8sClient.Create(ctx, rayJob) Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") @@ -1203,8 +1211,6 @@ var _ = Context("RayJob with different submission modes", func() { }) Describe("RayJob with DeletionPolicy=DeleteNone", Ordered, func() { - features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) - ctx := context.Background() namespace := "default" rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletenone", namespace) @@ -1213,6 +1219,10 @@ var _ = Context("RayJob with different submission modes", func() { rayJob.Spec.ShutdownAfterJobFinishes = false rayCluster := &rayv1.RayCluster{} + BeforeAll(func() { + DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)) + }) + It("Verify RayJob spec", func() { Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteNoneDeletionPolicy)) })