Skip to content

Commit

Permalink
Merge pull request #60 from SOF3/multi-core-support
Browse files Browse the repository at this point in the history
feat(util): support multiple PodProtector sources
  • Loading branch information
SOF3 authored Jan 7, 2025
2 parents 3003b28 + fc89716 commit cf35382
Show file tree
Hide file tree
Showing 20 changed files with 710 additions and 292 deletions.
84 changes: 40 additions & 44 deletions aggregator/aggregator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/utils/clock"

podseidonv1a1 "github.com/kubewharf/podseidon/apis/v1alpha1"
podseidonv1a1client "github.com/kubewharf/podseidon/client/clientset/versioned/typed/apis/v1alpha1"

"github.com/kubewharf/podseidon/util/component"
"github.com/kubewharf/podseidon/util/defaultconfig"
Expand Down Expand Up @@ -87,25 +86,19 @@ var NewController = component.Declare(
},
func(args ControllerArgs, requests *component.DepRequests) ControllerDeps {
return ControllerDeps{
coreClient: component.DepPtr(
requests,
kube.NewClient(kube.ClientArgs{ClusterName: constants.CoreClusterName}),
),
sourceProvider: args.SourceProvider(requests),
workerClient: component.DepPtr(
requests,
kube.NewClient(kube.ClientArgs{ClusterName: constants.WorkerClusterName}),
),
pprInformer: component.DepPtr(
requests,
pprutil.NewIndexedInformer(pprutil.IndexedInformerArgs{
ClusterName: constants.CoreClusterName,
InformerPhase: "leader",
Elector: optional.Some(constants.ElectorArgs),
}),
),
pprInformer: component.DepPtr(requests, pprutil.NewIndexedInformer(pprutil.IndexedInformerArgs{
Suffix: "",
SourceProvider: args.SourceProvider,
Elector: optional.Some(constants.ElectorArgs),
})),
observer: o11y.Request[observer.Observer](requests),
elector: component.DepPtr(requests, kube.NewElector(constants.ElectorArgs)),
worker: component.DepPtr(requests, worker.New[types.NamespacedName](
worker: component.DepPtr(requests, worker.New[pprutil.PodProtectorKey](
"aggregator",
args.Clock,
)),
Expand Down Expand Up @@ -143,21 +136,24 @@ var NewController = component.Declare(
},
)

func DefaultArg() component.Declared[util.Empty] {
func DefaultArg(sourceProviderRequest pprutil.SourceProviderRequest) component.Declared[util.Empty] {
return NewController(ControllerArgs{
InformerSyncTimeAlgos: map[string]synctime.PodInterpreter{
"clock": &synctime.ClockPodInterpreter{Clock: clock.RealClock{}},
"status": synctime.StatusPodInterpreter{},
},
DefaultInformerSyncTimeAlgo: "clock",
Clock: clock.RealClock{},
SourceProvider: sourceProviderRequest,
})
}

type ControllerArgs struct {
InformerSyncTimeAlgos map[string]synctime.PodInterpreter
DefaultInformerSyncTimeAlgo string
Clock clock.WithTicker

SourceProvider pprutil.SourceProviderRequest
}

type ControllerOptions struct {
Expand All @@ -169,13 +165,13 @@ type ControllerOptions struct {
}

type ControllerDeps struct {
coreClient component.Dep[*kube.Client]
workerClient component.Dep[*kube.Client]
pprInformer component.Dep[pprutil.IndexedInformer]
elector component.Dep[*kube.Elector]
observer component.Dep[observer.Observer]
worker component.Dep[worker.Api[types.NamespacedName]]
defaultConfig component.Dep[*defaultconfig.Options]
sourceProvider func() pprutil.SourceProvider
workerClient component.Dep[*kube.Client]
pprInformer component.Dep[pprutil.IndexedInformer]
elector component.Dep[*kube.Elector]
observer component.Dep[observer.Observer]
worker component.Dep[worker.Api[pprutil.PodProtectorKey]]
defaultConfig component.Dep[*defaultconfig.Options]
}

type ControllerState struct {
Expand All @@ -194,9 +190,9 @@ type Sets = *labelindex.Locked[
type Caches struct {
podIndex Sets

pprClient podseidonv1a1client.PodProtectorsGetter
podLister corev1listers.PodLister
pprInformer pprutil.IndexedInformer
sourceProvider pprutil.SourceProvider
podLister corev1listers.PodLister
pprInformer pprutil.IndexedInformer

informerSyncReader synctime.Reader

Expand Down Expand Up @@ -237,7 +233,7 @@ func initController(
state := &ControllerState{
caches: Caches{
podIndex: podIndex,
pprClient: deps.coreClient.Get().PodseidonClientSet().PodseidonV1alpha1(),
sourceProvider: deps.sourceProvider(),
podLister: podLister,
pprInformer: deps.pprInformer.Get(),
informerSyncReader: informerSyncReader,
Expand All @@ -248,7 +244,7 @@ func initController(

queue := deps.worker.Get()
queue.SetExecutor(
func(ctx context.Context, item types.NamespacedName) error {
func(ctx context.Context, item pprutil.PodProtectorKey) error {
return reconcile(ctx, args, options, deps, queue, &state.caches, item)
},
map[string]worker.Prereq{
Expand All @@ -258,8 +254,8 @@ func initController(
)
queue.SetBeforeStart(deps.elector.Get().Await)

deps.pprInformer.Get().AddPostHandler(func(nsName types.NamespacedName) {
queue.EnqueueDelayed(nsName, *deps.defaultConfig.Get().AggregationRate)
deps.pprInformer.Get().AddPostHandler(func(key pprutil.PodProtectorKey) {
queue.EnqueueDelayed(key, *deps.defaultConfig.Get().AggregationRate)
})

return state, nil
Expand Down Expand Up @@ -361,7 +357,7 @@ type podEventHandler struct {
podIndex Sets
pprInformer pprutil.IndexedInformer

queue worker.Api[types.NamespacedName]
queue worker.Api[pprutil.PodProtectorKey]
notifyInformerSync synctime.Notifier
nextEventPool *nextEventPool
}
Expand Down Expand Up @@ -395,27 +391,27 @@ func (handler *podEventHandler) handle(ctx context.Context, obj any, stillPresen
return
}

nsName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
podNsName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}

ctx, cancelFunc := handler.observer.StartEnqueue(ctx, observer.StartEnqueue{
Namespace: nsName.Namespace,
Name: nsName.Name,
Namespace: podNsName.Namespace,
Name: podNsName.Name,
Kind: "Pod",
})
defer cancelFunc()

defer handler.observer.EndEnqueue(ctx, observer.EndEnqueue{})

if stillPresent {
handler.podIndex.Track(nsName, pod.Labels)
handler.podIndex.Track(podNsName, pod.Labels)
} else {
handler.podIndex.Untrack(nsName)
handler.podIndex.Untrack(podNsName)
}

if err := handler.notifyInformerSync(pod); err != nil {
handler.observer.EnqueueError(ctx, observer.EnqueueError{
Namespace: nsName.Namespace,
Name: nsName.Name,
Namespace: podNsName.Namespace,
Name: podNsName.Name,
Err: err,
})

Expand Down Expand Up @@ -462,9 +458,9 @@ func reconcile(
args ControllerArgs,
options ControllerOptions,
deps ControllerDeps,
queue worker.Api[types.NamespacedName],
queue worker.Api[pprutil.PodProtectorKey],
caches *Caches,
item types.NamespacedName,
item pprutil.PodProtectorKey,
) error {
obs := deps.observer.Get()

Expand Down Expand Up @@ -508,9 +504,9 @@ func tryReconcile(
ctx context.Context,
options reconcileOptions,
obs observer.Observer,
queue worker.Api[types.NamespacedName],
queue worker.Api[pprutil.PodProtectorKey],
caches *Caches,
queueItem types.NamespacedName,
queueItem pprutil.PodProtectorKey,
) observer.EndReconcile {
pprOpt, err := caches.pprInformer.Get(queueItem)
if err != nil {
Expand Down Expand Up @@ -588,8 +584,8 @@ func tryReconcile(
computedConfig := options.defaultConfig.Compute(optional.Some(ppr.Spec.AdmissionHistoryConfig))
pprutil.Summarize(computedConfig, ppr)

_, err := caches.pprClient.PodProtectors(ppr.Namespace).
UpdateStatus(ctx, ppr, metav1.UpdateOptions{})
err := caches.sourceProvider.
UpdateStatus(ctx, queueItem.SourceName, ppr)
if err != nil {
return observer.EndReconcile{
Err: errors.TagWrapf(
Expand Down Expand Up @@ -727,7 +723,7 @@ func isPodAvailable(
// Clean up obsolete admission history observed by the current aggregation.
func updateLastEventTime(
caches *Caches,
queueItem types.NamespacedName,
queueItem pprutil.PodProtectorKey,
outputStatus *podseidonv1a1.PodProtectorCellStatus,
outputChanged *haschange.Changed,
lastEventTime time.Time,
Expand Down
8 changes: 6 additions & 2 deletions aggregator/aggregator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
"github.com/kubewharf/podseidon/util/errors"
"github.com/kubewharf/podseidon/util/iter"
"github.com/kubewharf/podseidon/util/kube"
o11yklog "github.com/kubewharf/podseidon/util/o11y/klog"
"github.com/kubewharf/podseidon/util/optional"
pprutil "github.com/kubewharf/podseidon/util/podprotector"

"github.com/kubewharf/podseidon/aggregator/aggregator"
"github.com/kubewharf/podseidon/aggregator/observer"
Expand Down Expand Up @@ -280,7 +282,8 @@ func testReconcile(

coreClient := tc.CoreClient()

cmd.MockStartup(ctx, []func(*component.DepRequests){
cmd.MockStartupWithCliArgs(ctx, []func(*component.DepRequests){
o11yklog.RequestKlogArgs,
component.ApiOnly("core-kube", coreClient),
component.ApiOnly("worker-kube", tc.WorkerClient(clk)),
component.ApiOnly("observer-aggregator", obs),
Expand All @@ -295,8 +298,9 @@ func testReconcile(
},
DefaultInformerSyncTimeAlgo: "fake-clock",
Clock: clk,
SourceProvider: pprutil.RequestSingleSourceProvider("core"),
})),
})
}, []string{"--klog-v=6"})

waitAndAssert(ctx, t, coreClient, reconcileCh, tc.ExpectInitial)

Expand Down
12 changes: 6 additions & 6 deletions aggregator/aggregator/next_event_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

"github.com/kubewharf/podseidon/util/optional"
pprutil "github.com/kubewharf/podseidon/util/podprotector"
)

// Pool of items that need to reconcile upon receiving a new informer event.
Expand All @@ -44,19 +44,19 @@ type nextEventPool struct {
lastDrain atomic.Pointer[time.Time]

itemsMu sync.Mutex
items sets.Set[types.NamespacedName]
items sets.Set[pprutil.PodProtectorKey]
}

func newNextEventPool() *nextEventPool {
return &nextEventPool{
undrainedTime: atomic.Pointer[time.Time]{},
lastDrain: atomic.Pointer[time.Time]{},
itemsMu: sync.Mutex{},
items: sets.New[types.NamespacedName](),
items: sets.New[pprutil.PodProtectorKey](),
}
}

func (pool *nextEventPool) Push(item types.NamespacedName) {
func (pool *nextEventPool) Push(item pprutil.PodProtectorKey) {
pool.itemsMu.Lock()
defer pool.itemsMu.Unlock()

Expand All @@ -69,15 +69,15 @@ func (pool *nextEventPool) Push(item types.NamespacedName) {
}

func (pool *nextEventPool) Drain() (
_objects sets.Set[types.NamespacedName],
_objects sets.Set[pprutil.PodProtectorKey],
_objectLatency time.Duration,
_sinceLastDrain optional.Optional[time.Duration],
) {
pool.itemsMu.Lock()
defer pool.itemsMu.Unlock()

old := pool.items
pool.items = sets.New[types.NamespacedName]()
pool.items = sets.New[pprutil.PodProtectorKey]()

undrainedTime := pool.undrainedTime.Swap(nil)
lastDrain := pool.lastDrain.Swap(ptr.To(time.Now()))
Expand Down
1 change: 0 additions & 1 deletion aggregator/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package constants
import "github.com/kubewharf/podseidon/util/kube"

const (
CoreClusterName kube.ClusterName = "core"
WorkerClusterName kube.ClusterName = "worker"
)

Expand Down
2 changes: 1 addition & 1 deletion aggregator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.23.0

require (
github.com/kubewharf/podseidon/apis v0.0.0
github.com/kubewharf/podseidon/client v0.0.0
github.com/kubewharf/podseidon/util v0.0.0
github.com/stretchr/testify v1.10.0
k8s.io/api v0.32.0
Expand Down Expand Up @@ -33,6 +32,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kubewharf/podseidon/client v0.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
3 changes: 2 additions & 1 deletion aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
healthzobserver "github.com/kubewharf/podseidon/util/healthz/observer"
kubeobserver "github.com/kubewharf/podseidon/util/kube/observer"
"github.com/kubewharf/podseidon/util/o11y/metrics"
pprutil "github.com/kubewharf/podseidon/util/podprotector"
pprutilobserver "github.com/kubewharf/podseidon/util/podprotector/observer"
"github.com/kubewharf/podseidon/util/pprof"
"github.com/kubewharf/podseidon/util/util"
Expand All @@ -39,7 +40,7 @@ func main() {
kubeobserver.ProvideElector,
aggregatorobserver.Provide,
pprutilobserver.ProvideInformer,
component.RequireDep(aggregator.DefaultArg()),
component.RequireDep(aggregator.DefaultArg(pprutil.RequestSingleSourceProvider("core"))),
component.RequireDep(updatetrigger.New(updatetrigger.Args{})),
)
}
5 changes: 3 additions & 2 deletions allinone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
healthzobserver "github.com/kubewharf/podseidon/util/healthz/observer"
kubeobserver "github.com/kubewharf/podseidon/util/kube/observer"
"github.com/kubewharf/podseidon/util/o11y/metrics"
pprutil "github.com/kubewharf/podseidon/util/podprotector"
pprutilobserver "github.com/kubewharf/podseidon/util/podprotector/observer"
"github.com/kubewharf/podseidon/util/pprof"
"github.com/kubewharf/podseidon/util/util"
Expand Down Expand Up @@ -48,7 +49,7 @@ func main() {
generatorobserver.Provide,
aggregatorobserver.Provide,
webhookobserver.Provide,
component.RequireDep(aggregator.DefaultArg()),
component.RequireDep(aggregator.DefaultArg(pprutil.RequestSingleSourceProvider("core"))),
component.RequireDep(updatetrigger.New(updatetrigger.Args{})),
component.RequireDep(generator.NewController(
generator.ControllerArgs{
Expand All @@ -58,6 +59,6 @@ func main() {
},
)),
component.RequireDep(monitor.New(monitor.Args{})),
component.RequireDep(webhookserver.New(util.Empty{})),
component.RequireDep(webhookserver.New(webhookserver.Args{SourceProvider: pprutil.RequestSingleSourceProvider("core")})),
)
}
8 changes: 6 additions & 2 deletions util/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func tryRun(requests []func(*component.DepRequests)) error {
// but does not accept configuring flags, does not expose a health check server
// and does not block until shutdown.
// Returns as soon as startup completes to allow the caller to orchestrate integration tests.
func MockStartup(ctx context.Context, requests []func(*component.DepRequests)) component.ApiMap {
func MockStartupWithCliArgs(ctx context.Context, requests []func(*component.DepRequests), cliArgs []string) component.ApiMap {
components := component.ResolveList(requests)

fs := new(pflag.FlagSet)
setupFlags(components, fs)

if err := fs.Parse([]string{}); err != nil {
if err := fs.Parse(cliArgs); err != nil {
panic(err)
}

Expand All @@ -140,6 +140,10 @@ func MockStartup(ctx context.Context, requests []func(*component.DepRequests)) c
return component.NamedComponentsToApiMap(components)
}

func MockStartup(ctx context.Context, requests []func(*component.DepRequests)) component.ApiMap {
return MockStartupWithCliArgs(ctx, requests, []string{})
}

func setupFlags(components []component.NamedComponent, fs *pflag.FlagSet) {
for _, comp := range components {
compFs := flag.FlagSet{Usage: nil}
Expand Down
Loading

0 comments on commit cf35382

Please sign in to comment.