From 6dcf9086aefbc3674d3871e68f8b1056c5cf18f0 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 26 Feb 2025 14:46:04 -0800 Subject: [PATCH] add more docuementation --- guardian/pkg/asyncutil/chan.go | 36 ++++++++++++++++++++++++--- guardian/pkg/asyncutil/ratelimiter.go | 17 +++++++++++++ guardian/pkg/asyncutil/types.go | 20 +++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/guardian/pkg/asyncutil/chan.go b/guardian/pkg/asyncutil/chan.go index d2b23ef125c..3ce336c1d02 100644 --- a/guardian/pkg/asyncutil/chan.go +++ b/guardian/pkg/asyncutil/chan.go @@ -1,17 +1,42 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package asyncutil -import "context" +import ( + "context" + "errors" +) +// ReadWithContext reads from the given channel and blocks until either an object is pulled off the channel, the context +// is done, or the channel is closed. func ReadWithContext[S any](ctx context.Context, ch <-chan S) (S, error) { select { case <-ctx.Done(): var d S return d, ctx.Err() - case v := <-ch: - return v, nil + case v, ok := <-ch: + var err error + if !ok { + err = errors.New("channel closed") + } + return v, err } } +// WriteNoWait writes the given value to the given channel but doesn't wait to do so if the channel is full. If the +// channel is full then it returns and the return value will be false. func WriteNoWait[R any](c chan R, o R) bool { select { case c <- o: @@ -21,6 +46,8 @@ func WriteNoWait[R any](c chan R, o R) bool { } } +// ReadNoWait reads a value off the channel if there is one. If the channel is empty, it returns. The second return value +// says whether a values was read off the channel or if it was empty. func ReadNoWait[R any](c <-chan R) (R, bool) { select { case v := <-c: @@ -47,6 +74,8 @@ func ReadBatch[R any](c <-chan R, n int) []R { } } +// ReadAll reads all the values off the channel and returns them as an array. It doesn't wait for the channel to be +// closed to return. func ReadAll[R any](c <-chan R) []R { var out []R for { @@ -62,6 +91,7 @@ func ReadAll[R any](c <-chan R) []R { } } +// Clear removes all items from the channel and returns. It doesn't wait for the channel to close to return. func Clear[R any](c chan R) { for { select { diff --git a/guardian/pkg/asyncutil/ratelimiter.go b/guardian/pkg/asyncutil/ratelimiter.go index f4794582fd3..a571a2bcb74 100644 --- a/guardian/pkg/asyncutil/ratelimiter.go +++ b/guardian/pkg/asyncutil/ratelimiter.go @@ -1,3 +1,17 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package asyncutil import ( @@ -18,6 +32,9 @@ type retryRateLimiter[P any, V any] struct { pChan chan Command[P, V] } +// NewFunctionCallRateLimiter returns a FuncCallRateLimiter which rate limits the calls to the provided function. When +// Run is called, it guarantees that the given function is not called more than once per waitDuration, and an error +// is returned from Run if the function is called more than maxCalls within the windowDuration. func NewFunctionCallRateLimiter[P any, V any](waitDuration time.Duration, windowDuration time.Duration, maxCalls int, f func(P) (V, error)) FuncCallRateLimiter[P, V] { var callTimestamps []time.Time pChan := make(chan Command[P, V], 100) diff --git a/guardian/pkg/asyncutil/types.go b/guardian/pkg/asyncutil/types.go index 994c2a133f6..8ab13832f2a 100644 --- a/guardian/pkg/asyncutil/types.go +++ b/guardian/pkg/asyncutil/types.go @@ -1,10 +1,28 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package asyncutil +// Command represents a command to run asynchronously. It allows for providing paramters to be passed to whatever is +// executing the command and has a channel to wait for the result on. type Command[P any, R any] struct { params P resultChan chan Result[R] } +// NewCommand creates a new command of the given type with the given parameter. It also returns the result channel that +// the result will be written to when the command is executed. func NewCommand[P any, R any](params P) (Command[P, R], chan Result[R]) { resultChan := make(chan Result[R], 1) return Command[P, R]{params: params, resultChan: resultChan}, resultChan @@ -38,6 +56,7 @@ func (c Command[C, R]) ReturnError(err error) { } } +// Signaler is an interface used for waiting for and sending simple signals. type Signaler interface { Send() Receive() <-chan struct{} @@ -65,6 +84,7 @@ func NewSignaler() Signaler { return &signaler{ch: make(chan struct{}, 1)} } +// AsyncErrorBuffer is an error buffer that can be used in multiple routines. type AsyncErrorBuffer interface { Write(err error) Receive() <-chan error