Skip to content

Commit

Permalink
add more docuementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian-McM committed Feb 26, 2025
1 parent 2913f67 commit 6dcf908
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
36 changes: 33 additions & 3 deletions guardian/pkg/asyncutil/chan.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncutil

import "context"
import (
"context"
"errors"
)

// ReadWithContext reads from the given channel and blocks until either an object is pulled off the channel, the context
// is done, or the channel is closed.
func ReadWithContext[S any](ctx context.Context, ch <-chan S) (S, error) {
select {
case <-ctx.Done():
var d S
return d, ctx.Err()
case v := <-ch:
return v, nil
case v, ok := <-ch:
var err error
if !ok {
err = errors.New("channel closed")
}
return v, err
}
}

// WriteNoWait writes the given value to the given channel but doesn't wait to do so if the channel is full. If the
// channel is full then it returns and the return value will be false.
func WriteNoWait[R any](c chan R, o R) bool {
select {
case c <- o:
Expand All @@ -21,6 +46,8 @@ func WriteNoWait[R any](c chan R, o R) bool {
}
}

// ReadNoWait reads a value off the channel if there is one. If the channel is empty, it returns. The second return value
// says whether a values was read off the channel or if it was empty.
func ReadNoWait[R any](c <-chan R) (R, bool) {
select {
case v := <-c:
Expand All @@ -47,6 +74,8 @@ func ReadBatch[R any](c <-chan R, n int) []R {
}
}

// ReadAll reads all the values off the channel and returns them as an array. It doesn't wait for the channel to be
// closed to return.
func ReadAll[R any](c <-chan R) []R {
var out []R
for {
Expand All @@ -62,6 +91,7 @@ func ReadAll[R any](c <-chan R) []R {
}
}

// Clear removes all items from the channel and returns. It doesn't wait for the channel to close to return.
func Clear[R any](c chan R) {
for {
select {
Expand Down
17 changes: 17 additions & 0 deletions guardian/pkg/asyncutil/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncutil

import (
Expand All @@ -18,6 +32,9 @@ type retryRateLimiter[P any, V any] struct {
pChan chan Command[P, V]
}

// NewFunctionCallRateLimiter returns a FuncCallRateLimiter which rate limits the calls to the provided function. When
// Run is called, it guarantees that the given function is not called more than once per waitDuration, and an error
// is returned from Run if the function is called more than maxCalls within the windowDuration.
func NewFunctionCallRateLimiter[P any, V any](waitDuration time.Duration, windowDuration time.Duration, maxCalls int, f func(P) (V, error)) FuncCallRateLimiter[P, V] {
var callTimestamps []time.Time
pChan := make(chan Command[P, V], 100)
Expand Down
20 changes: 20 additions & 0 deletions guardian/pkg/asyncutil/types.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
// Copyright (c) 2025 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asyncutil

// Command represents a command to run asynchronously. It allows for providing paramters to be passed to whatever is
// executing the command and has a channel to wait for the result on.
type Command[P any, R any] struct {
params P
resultChan chan Result[R]
}

// NewCommand creates a new command of the given type with the given parameter. It also returns the result channel that
// the result will be written to when the command is executed.
func NewCommand[P any, R any](params P) (Command[P, R], chan Result[R]) {
resultChan := make(chan Result[R], 1)
return Command[P, R]{params: params, resultChan: resultChan}, resultChan
Expand Down Expand Up @@ -38,6 +56,7 @@ func (c Command[C, R]) ReturnError(err error) {
}
}

// Signaler is an interface used for waiting for and sending simple signals.
type Signaler interface {
Send()
Receive() <-chan struct{}
Expand Down Expand Up @@ -65,6 +84,7 @@ func NewSignaler() Signaler {
return &signaler{ch: make(chan struct{}, 1)}
}

// AsyncErrorBuffer is an error buffer that can be used in multiple routines.
type AsyncErrorBuffer interface {
Write(err error)
Receive() <-chan error
Expand Down

0 comments on commit 6dcf908

Please sign in to comment.