Skip to content

Commit

Permalink
skip suspending worker groups if the RayJobDeletionPolicy feature fla…
Browse files Browse the repository at this point in the history
…g is not enabled (#2770)
  • Loading branch information
rueian authored Jan 18, 2025
1 parent d86ea62 commit 405fe67
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ _Appears in:_

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.<br />A suspended worker group will have all pods deleted. | | |
| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.<br />A suspended worker group will have all pods deleted.<br />This is not a user-facing API and is only used by RayJob DeletionPolicy. | | |
| `groupName` _string_ | we can have multiple worker groups, we distinguish them by name | | |
| `replicas` _integer_ | Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional. | 0 | |
| `minReplicas` _integer_ | MinReplicas denotes the minimum number of desired Pods for this worker group. | 0 | |
Expand Down
1 change: 1 addition & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type HeadGroupSpec struct {
type WorkerGroupSpec struct {
// Suspend indicates whether a worker group should be suspended.
// A suspended worker group will have all pods deleted.
// This is not a user-facing API and is only used by RayJob DeletionPolicy.
Suspend *bool `json:"suspend,omitempty"`
// we can have multiple worker groups, we distinguish them by name
GroupName string `json:"groupName"`
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ func validateRayClusterSpec(instance *rayv1.RayCluster) error {
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.ExternalStorageNamespace` instead of
// the annotation `ray.io/external-storage-namespace`.

if !features.Enabled(features.RayJobDeletionPolicy) {
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
if workerGroup.Suspend != nil && *workerGroup.Suspend {
return fmt.Errorf("suspending worker groups is currently available when the RayJobDeletionPolicy feature gate is enabled")
}
}
}

enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)
if enableInTreeAutoscaling {
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,10 @@ var _ = Context("Inside the default namespace", func() {
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

BeforeAll(func() {
DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true))
})

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Expand Down Expand Up @@ -871,6 +875,10 @@ var _ = Context("Inside the default namespace", func() {
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

BeforeAll(func() {
DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true))
})

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Expand Down
16 changes: 16 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3751,7 +3751,20 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
name string
errorMessage string
expectError bool
featureGate bool
}{
{
name: "suspend without autoscaler and the feature gate",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
},
},
featureGate: false,
expectError: true,
errorMessage: "suspending worker groups is currently available when the RayJobDeletionPolicy feature gate is enabled",
},
{
name: "suspend without autoscaler",
rayCluster: &rayv1.RayCluster{
Expand All @@ -3760,6 +3773,7 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
},
},
featureGate: true,
expectError: false,
},
{
Expand All @@ -3772,13 +3786,15 @@ func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
EnableInTreeAutoscaling: ptr.To[bool](true),
},
},
featureGate: true,
expectError: true,
errorMessage: "suspending worker groups is not currently supported with Autoscaler enabled",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, tt.featureGate)()
err := validateRayClusterSpec(tt.rayCluster)
if tt.expectError {
assert.Error(t, err)
Expand Down
22 changes: 16 additions & 6 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,6 @@ var _ = Context("RayJob with different submission modes", func() {
})

Describe("RayJob with DeletionPolicy=DeleteCluster", Ordered, func() {
features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)

ctx := context.Background()
namespace := "default"
rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletecluster", namespace)
Expand All @@ -848,6 +846,10 @@ var _ = Context("RayJob with different submission modes", func() {
rayJob.Spec.ShutdownAfterJobFinishes = false
rayCluster := &rayv1.RayCluster{}

BeforeAll(func() {
DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true))
})

It("Verify RayJob spec", func() {
Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteClusterDeletionPolicy))
})
Expand Down Expand Up @@ -960,8 +962,6 @@ var _ = Context("RayJob with different submission modes", func() {
})

Describe("RayJob with DeletionPolicy=DeleteWorkers", Ordered, func() {
features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)

ctx := context.Background()
namespace := "default"
rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deleteworkers", namespace)
Expand All @@ -970,6 +970,10 @@ var _ = Context("RayJob with different submission modes", func() {
rayJob.Spec.ShutdownAfterJobFinishes = false
rayCluster := &rayv1.RayCluster{}

BeforeAll(func() {
DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true))
})

It("Verify RayJob spec", func() {
Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteWorkersDeletionPolicy))
})
Expand Down Expand Up @@ -1107,6 +1111,10 @@ var _ = Context("RayJob with different submission modes", func() {
rayJob.Spec.ShutdownAfterJobFinishes = false
rayCluster := &rayv1.RayCluster{}

BeforeAll(func() {
DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true))
})

It("Create a RayJob custom resource", func() {
err := k8sClient.Create(ctx, rayJob)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")
Expand Down Expand Up @@ -1203,8 +1211,6 @@ var _ = Context("RayJob with different submission modes", func() {
})

Describe("RayJob with DeletionPolicy=DeleteNone", Ordered, func() {
features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)

ctx := context.Background()
namespace := "default"
rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletenone", namespace)
Expand All @@ -1213,6 +1219,10 @@ var _ = Context("RayJob with different submission modes", func() {
rayJob.Spec.ShutdownAfterJobFinishes = false
rayCluster := &rayv1.RayCluster{}

BeforeAll(func() {
DeferCleanup(features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true))
})

It("Verify RayJob spec", func() {
Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteNoneDeletionPolicy))
})
Expand Down

0 comments on commit 405fe67

Please sign in to comment.