Skip to content

Commit

Permalink
Move matching labels to association.go (#2734)
Browse files Browse the repository at this point in the history
  • Loading branch information
owenowenisme authored Jan 18, 2025
1 parent 405fe67 commit 5fde3c6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 14 deletions.
19 changes: 19 additions & 0 deletions ray-operator/controllers/ray/common/association.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func RayClusterWorkerPodsAssociationOptions(instance *rayv1.RayCluster) Associat
}
}

func RayClusterRedisCleanupJobAssociationOptions(instance *rayv1.RayCluster) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode),
},
}
}

func RayClusterGroupPodsAssociationOptions(instance *rayv1.RayCluster, group string) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
Expand Down Expand Up @@ -184,3 +194,12 @@ func GetRayClusterHeadPod(ctx context.Context, reader client.Reader, instance *r
}
return &runtimePods.Items[0], nil
}

func RayClusterNetworkResourcesOptions(instance *rayv1.RayCluster) AssociationOptions {
return AssociationOptions{
client.InNamespace(instance.Namespace),
client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
},
}
}
72 changes: 72 additions & 0 deletions ray-operator/controllers/ray/common/association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"testing"

routev1 "github.com/openshift/api/route/v1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -292,3 +293,74 @@ func TestGetRayClusterHeadPod(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, ret, headPod)
}

func TestRayClusterRedisCleanupJobAssociationOptions(t *testing.T) {
// Create a new scheme
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

instance := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-example",
Namespace: "default",
},
}

_ = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "redis-cleanup",
Namespace: instance.ObjectMeta.Namespace,
Labels: map[string]string{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode),
},
},
}

expected := []client.ListOption{
client.InNamespace(instance.ObjectMeta.Namespace),
client.MatchingLabels(map[string]string{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode),
}),
}
result := RayClusterRedisCleanupJobAssociationOptions(instance).ToListOptions()

assert.Equal(t, expected, result)
}

func TestRayClusterNetworkResourcesOptions(t *testing.T) {
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)
_ = routev1.AddToScheme(newScheme)
instance := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-example",
Namespace: "default",
Annotations: map[string]string{
IngressClassAnnotationKey: "nginx",
},
},
}
_ = &routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateRouteName(instance.Name),
Namespace: instance.Namespace,
Labels: map[string]string{
utils.RayClusterLabelKey: instance.Name,
},
},
}
expected := []client.ListOption{
client.InNamespace(instance.ObjectMeta.Namespace),
client.MatchingLabels(map[string]string{
utils.RayClusterLabelKey: instance.Name,
}),
}

result := RayClusterNetworkResourcesOptions(instance).ToListOptions()

assert.Equal(t, expected, result)
}
24 changes: 10 additions & 14 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,9 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

// We can start the Redis cleanup process now because the head Pod has been terminated.
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode)}
filterLabels := common.RayClusterRedisCleanupJobAssociationOptions(instance).ToListOptions()
redisCleanupJobs := batchv1.JobList{}
if err := r.List(ctx, &redisCleanupJobs, client.InNamespace(instance.Namespace), filterLabels); err != nil {
if err := r.List(ctx, &redisCleanupJobs, filterLabels...); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

Expand Down Expand Up @@ -550,8 +549,8 @@ func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *r
func (r *RayClusterReconciler) reconcileRouteOpenShift(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)
headRoutes := routev1.RouteList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
if err := r.List(ctx, &headRoutes, client.InNamespace(instance.Namespace), filterLabels); err != nil {
filterLabels := common.RayClusterNetworkResourcesOptions(instance).ToListOptions()
if err := r.List(ctx, &headRoutes, filterLabels...); err != nil {
return err
}

Expand Down Expand Up @@ -581,8 +580,8 @@ func (r *RayClusterReconciler) reconcileRouteOpenShift(ctx context.Context, inst
func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)
headIngresses := networkingv1.IngressList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
if err := r.List(ctx, &headIngresses, client.InNamespace(instance.Namespace), filterLabels); err != nil {
filterLabels := common.RayClusterNetworkResourcesOptions(instance).ToListOptions()
if err := r.List(ctx, &headIngresses, filterLabels...); err != nil {
return err
}

Expand Down Expand Up @@ -613,9 +612,9 @@ func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, i
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)
services := corev1.ServiceList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
filterLabels := common.RayClusterHeadServiceListOptions(instance)

if err := r.List(ctx, &services, client.InNamespace(instance.Namespace), filterLabels); err != nil {
if err := r.List(ctx, &services, filterLabels...); err != nil {
return err
}

Expand Down Expand Up @@ -1518,11 +1517,8 @@ func (r *RayClusterReconciler) updateEndpoints(ctx context.Context, instance *ra
// We assume we can find the right one by filtering Services with appropriate label selectors
// and picking the first one. We may need to select by name in the future if the Service naming is stable.
rayHeadSvc := corev1.ServiceList{}
filterLabels := client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: "head",
}
if err := r.List(ctx, &rayHeadSvc, client.InNamespace(instance.Namespace), filterLabels); err != nil {
filterLabels := common.RayClusterHeadServiceListOptions(instance)
if err := r.List(ctx, &rayHeadSvc, filterLabels...); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ func TestReconcileHeadService(t *testing.T) {
Labels: map[string]string{
utils.RayClusterLabelKey: cluster.Name,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
utils.RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode)),
},
},
}
Expand All @@ -1026,6 +1027,7 @@ func TestReconcileHeadService(t *testing.T) {
headServiceSelector := labels.SelectorFromSet(map[string]string{
utils.RayClusterLabelKey: cluster.Name,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
utils.RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode)),
})

// Initialize RayCluster reconciler.
Expand Down

0 comments on commit 5fde3c6

Please sign in to comment.