Skip to content

Commit

Permalink
Extract common controller predicates and handlers (#456)
Browse files Browse the repository at this point in the history
* Extract `Lease` predicates

* Extract `ControllerRing` predicate

* Extract `Lease` mapper

* Extract `ControllerRing` mapper
  • Loading branch information
timebertt authored Feb 4, 2025
1 parent 808dc1a commit b93b5c4
Show file tree
Hide file tree
Showing 14 changed files with 793 additions and 175 deletions.
68 changes: 19 additions & 49 deletions pkg/controller/controllerring/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package controllerring

import (
"context"

coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -28,10 +26,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
shardinghandler "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/handler"
shardingpredicate "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/predicate"
)

// ControllerName is the name of this controller.
Expand All @@ -51,59 +49,31 @@ func (r *Reconciler) AddToManager(mgr manager.Manager) error {

return builder.ControllerManagedBy(mgr).
Named(ControllerName).
For(&shardingv1alpha1.ControllerRing{}, builder.WithPredicates(r.ControllerRingPredicate())).
Watches(&coordinationv1.Lease{}, handler.EnqueueRequestsFromMapFunc(MapLeaseToControllerRing), builder.WithPredicates(r.LeasePredicate())).
For(&shardingv1alpha1.ControllerRing{}, builder.WithPredicates(shardingpredicate.ControllerRingCreatedOrUpdated())).
Watches(
&coordinationv1.Lease{},
handler.EnqueueRequestsFromMapFunc(shardinghandler.MapLeaseToControllerRing),
builder.WithPredicates(r.LeasePredicate()),
).
WithOptions(controller.Options{
MaxConcurrentReconciles: 5,
}).
Complete(r)
}

func (r *Reconciler) ControllerRingPredicate() predicate.Predicate {
return predicate.And(
predicate.GenerationChangedPredicate{},
// ignore deletion of ControllerRings
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(_ event.UpdateEvent) bool { return true },
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
},
)
}

func MapLeaseToControllerRing(ctx context.Context, obj client.Object) []reconcile.Request {
ring, ok := obj.GetLabels()[shardingv1alpha1.LabelControllerRing]
if !ok {
return nil
}

return []reconcile.Request{{NamespacedName: client.ObjectKey{Name: ring}}}
}

func (r *Reconciler) LeasePredicate() predicate.Predicate {
return predicate.And(
predicate.NewPredicateFuncs(isShardLease),
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldLease, ok := e.ObjectOld.(*coordinationv1.Lease)
if !ok {
return false
}
newLease, ok := e.ObjectNew.(*coordinationv1.Lease)
if !ok {
return false
}

// only enqueue ring if the shard's state changed
now := r.Clock.Now()
return leases.ToState(oldLease, now).IsAvailable() != leases.ToState(newLease, now).IsAvailable()
shardingpredicate.IsShardLease(),
predicate.Or(
// react if a shard lease was created or deleted independent of its availability, we want to update
// ControllerRing.status.shards
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(_ event.UpdateEvent) bool { return false },
DeleteFunc: func(_ event.DeleteEvent) bool { return true },
},
DeleteFunc: func(_ event.DeleteEvent) bool { return true },
},
// for update events, only react if the shard's availability changed
shardingpredicate.ShardLeaseAvailabilityChanged(r.Clock),
),
)
}

func isShardLease(obj client.Object) bool {
return obj.GetLabels()[shardingv1alpha1.LabelControllerRing] != ""
}
88 changes: 10 additions & 78 deletions pkg/controller/sharder/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@ limitations under the License.
package sharder

import (
"context"

coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
shardinghandler "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/handler"
shardingpredicate "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/predicate"
)

// ControllerName is the name of this controller.
Expand All @@ -51,85 +47,21 @@ func (r *Reconciler) AddToManager(mgr manager.Manager) error {

return builder.ControllerManagedBy(mgr).
Named(ControllerName).
For(&shardingv1alpha1.ControllerRing{}, builder.WithPredicates(r.ControllerRingPredicate())).
Watches(&coordinationv1.Lease{}, handler.EnqueueRequestsFromMapFunc(MapLeaseToControllerRing), builder.WithPredicates(r.LeasePredicate())).
For(&shardingv1alpha1.ControllerRing{}, builder.WithPredicates(shardingpredicate.ControllerRingCreatedOrUpdated())).
Watches(
&coordinationv1.Lease{},
handler.EnqueueRequestsFromMapFunc(shardinghandler.MapLeaseToControllerRing),
builder.WithPredicates(r.LeasePredicate()),
).
WithOptions(controller.Options{
MaxConcurrentReconciles: 5,
}).
Complete(r)
}

func (r *Reconciler) ControllerRingPredicate() predicate.Predicate {
return predicate.And(
predicate.GenerationChangedPredicate{},
// ignore deletion of ControllerRings
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(_ event.UpdateEvent) bool { return true },
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
},
)
}

func MapLeaseToControllerRing(ctx context.Context, obj client.Object) []reconcile.Request {
ring, ok := obj.GetLabels()[shardingv1alpha1.LabelControllerRing]
if !ok {
return nil
}

return []reconcile.Request{{NamespacedName: client.ObjectKey{Name: ring}}}
}

func (r *Reconciler) LeasePredicate() predicate.Predicate {
return predicate.And(
predicate.NewPredicateFuncs(isShardLease),
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
lease, ok := e.Object.(*coordinationv1.Lease)
if !ok {
return false
}

// We only need to resync the ring if the new shard is available right away.
// Note: on controller start we will enqueue anyway for the add event of ControllerRings.
return leases.ToState(lease, r.Clock.Now()).IsAvailable()
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldLease, ok := e.ObjectOld.(*coordinationv1.Lease)
if !ok {
return false
}
newLease, ok := e.ObjectNew.(*coordinationv1.Lease)
if !ok {
return false
}

// We only need to resync the ring if the shard's availability changed.
now := r.Clock.Now()
return leases.ToState(oldLease, now).IsAvailable() != leases.ToState(newLease, now).IsAvailable()
},
DeleteFunc: func(e event.DeleteEvent) bool {
if e.DeleteStateUnknown {
// If we missed the delete event, we cannot know the final state of the shard (available or not) for sure.
// The included object might be stale, as we might have missed a relevant update before as well.
// Leases usually go unavailable before being deleted, so its unlikely that we need to act on this delete
// event.
// Hence, skip enqueing here in favor of periodic resyncs.
return true
}

lease, ok := e.Object.(*coordinationv1.Lease)
if !ok {
return false
}

// We only need to resync the ring if the removed shard was still available.
return leases.ToState(lease, r.Clock.Now()).IsAvailable()
},
},
shardingpredicate.IsShardLease(),
shardingpredicate.ShardLeaseAvailabilityChanged(r.Clock),
)
}

func isShardLease(obj client.Object) bool {
return obj.GetLabels()[shardingv1alpha1.LabelControllerRing] != ""
}
57 changes: 10 additions & 47 deletions pkg/controller/shardlease/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,23 @@ limitations under the License.
package shardlease

import (
"context"

coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
shardinghandler "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/handler"
shardingpredicate "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/predicate"
)

// ControllerName is the name of this controller.
const ControllerName = "shardlease"

var handlerLog = logf.Log.WithName("controller").WithName(ControllerName)

// AddToManager adds Reconciler to the given manager.
func (r *Reconciler) AddToManager(mgr manager.Manager) error {
if r.Client == nil {
Expand All @@ -53,55 +47,24 @@ func (r *Reconciler) AddToManager(mgr manager.Manager) error {
Named(ControllerName).
For(&coordinationv1.Lease{}, builder.WithPredicates(r.LeasePredicate())).
// enqueue all Leases belonging to a ControllerRing when it is created or the spec is updated
Watches(&shardingv1alpha1.ControllerRing{}, handler.EnqueueRequestsFromMapFunc(r.MapControllerRingToLeases), builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(
&shardingv1alpha1.ControllerRing{},
handler.EnqueueRequestsFromMapFunc(shardinghandler.MapControllerRingToLeases(r.Client)),
builder.WithPredicates(shardingpredicate.ControllerRingCreatedOrUpdated()),
).
WithOptions(controller.Options{
MaxConcurrentReconciles: 5,
}).
Complete(r)
}

func (r *Reconciler) LeasePredicate() predicate.Predicate {
// ignore deletion of shard leases
return predicate.And(
predicate.NewPredicateFuncs(isShardLease),
shardingpredicate.IsShardLease(),
shardingpredicate.ShardLeaseStateChanged(r.Clock),
// ignore deletion of shard leases
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldLease, ok := e.ObjectOld.(*coordinationv1.Lease)
if !ok {
return false
}
newLease, ok := e.ObjectNew.(*coordinationv1.Lease)
if !ok {
return false
}

now := r.Clock.Now()
return leases.ToState(oldLease, now) != leases.ToState(newLease, now)
},
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
},
)
}

func (r *Reconciler) MapControllerRingToLeases(ctx context.Context, obj client.Object) []reconcile.Request {
controllerRing := obj.(*shardingv1alpha1.ControllerRing)

leaseList := &coordinationv1.LeaseList{}
if err := r.Client.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: controllerRing.LeaseSelector()}); err != nil {
handlerLog.Error(err, "failed listing Leases for ControllerRing", "controllerRing", client.ObjectKeyFromObject(controllerRing))
return nil
}

requests := make([]reconcile.Request, 0, len(leaseList.Items))
for _, l := range leaseList.Items {
lease := l
requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&lease)})
}

return requests
}

func isShardLease(obj client.Object) bool {
return obj.GetLabels()[shardingv1alpha1.LabelControllerRing] != ""
}
54 changes: 54 additions & 0 deletions pkg/sharding/handler/controllerring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2025 Tim Ebert.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handler

import (
"context"

coordinationv1 "k8s.io/api/coordination/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
)

var handlerLog = logf.Log.WithName("handler")

// MapControllerRingToLeases maps a ControllerRing to all matching shard leases.
func MapControllerRingToLeases(reader client.Reader) handler.MapFunc {
return func(ctx context.Context, obj client.Object) []reconcile.Request {
controllerRing, ok := obj.(*shardingv1alpha1.ControllerRing)
if !ok {
return nil
}

leaseList := &coordinationv1.LeaseList{}
if err := reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: controllerRing.LeaseSelector()}); err != nil {
handlerLog.Error(err, "failed listing Leases for ControllerRing", "controllerRing", client.ObjectKeyFromObject(controllerRing))
return nil
}

requests := make([]reconcile.Request, 0, len(leaseList.Items))
for _, lease := range leaseList.Items {
requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&lease)})
}

return requests
}
}
Loading

0 comments on commit b93b5c4

Please sign in to comment.