-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Leonidas Vrachnis <[email protected]>
- Loading branch information
1 parent
a27349a
commit ac91461
Showing
7 changed files
with
205 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package sync | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
) | ||
|
||
// A EagerGroup is a collection of goroutines working on subtasks that are part of | ||
// the same overall task. | ||
// | ||
// Use NewEagerGroup to create a new group. | ||
type EagerGroup struct { | ||
ctx context.Context | ||
cancel context.CancelCauseFunc | ||
wg sync.WaitGroup | ||
sem chan struct{} | ||
errOnce sync.Once | ||
err error | ||
} | ||
|
||
// NewEagerGroup returns a new eager group and an associated Context derived from ctx. | ||
// | ||
// The derived Context is canceled the first time a function passed to Go | ||
// returns a non-nil error or the first time Wait returns, whichever occurs | ||
// first. | ||
// | ||
// limit < 1 means no limit on the number of active goroutines. | ||
func NewEagerGroup(ctx context.Context, limit int) (*EagerGroup, context.Context) { | ||
ctx, cancel := context.WithCancelCause(ctx) | ||
g := &EagerGroup{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
} | ||
if limit > 0 { | ||
g.sem = make(chan struct{}, limit) | ||
} | ||
return g, ctx | ||
} | ||
|
||
// Go calls the given function in a new goroutine. | ||
// It blocks until the new goroutine can be added without the number of | ||
// active goroutines in the group exceeding the configured limit. | ||
// | ||
// The first call to return a non-nil error cancels the group's context. | ||
// The error will be returned by Wait. | ||
// | ||
// If the group was created by calling NewEagerGroup with limit < 1, there is no | ||
// limit on the number of active goroutines. | ||
// | ||
// If the group's context is canceled, routines that have not executed yet due to the limit won't be executed. | ||
// Additionally, there is a best effort not to execute `f()` once the context is canceled | ||
// and that happens whether or not a limit has been specified. | ||
func (g *EagerGroup) Go(f func() error) { | ||
if err := g.ctx.Err(); err != nil { | ||
g.errOnce.Do(func() { | ||
g.err = g.ctx.Err() | ||
g.cancel(g.err) | ||
}) | ||
return | ||
} | ||
|
||
if g.sem != nil { | ||
select { | ||
case <-g.ctx.Done(): | ||
g.errOnce.Do(func() { | ||
g.err = g.ctx.Err() | ||
g.cancel(g.err) | ||
}) | ||
return | ||
case g.sem <- struct{}{}: | ||
} | ||
} | ||
|
||
g.wg.Add(1) | ||
go func() { | ||
err := g.ctx.Err() | ||
if err == nil { | ||
err = f() | ||
} | ||
if err != nil { | ||
g.errOnce.Do(func() { | ||
g.err = err | ||
g.cancel(g.err) | ||
}) | ||
} | ||
if g.sem != nil { | ||
<-g.sem | ||
} | ||
g.wg.Done() | ||
}() | ||
} | ||
|
||
// Wait blocks until all function calls from the Go method have returned, then | ||
// returns the first non-nil error (if any) from them. | ||
func (g *EagerGroup) Wait() error { | ||
g.wg.Wait() | ||
g.cancel(g.err) | ||
return g.err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package sync | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestEagerGroupWithLimit(t *testing.T) { | ||
g, ctx := NewEagerGroup(context.Background(), 2) | ||
var count atomic.Int64 | ||
// One of the following three goroutines should DEFINITELY NOT be executed due to the limit of 2 and the context being cancelled. | ||
// The context should get cancelled automatically because the first two routines returned an error. | ||
g.Go(func() error { | ||
t.Log("one") | ||
count.Add(1) | ||
return fmt.Errorf("one") | ||
}) | ||
g.Go(func() error { | ||
t.Log("two") | ||
count.Add(1) | ||
return fmt.Errorf("two") | ||
}) | ||
g.Go(func() error { | ||
t.Log("three") | ||
count.Add(1) | ||
return fmt.Errorf("three") | ||
}) | ||
require.Error(t, g.Wait(), "We expect group.Wait() to return an error") | ||
ok := true | ||
select { | ||
case <-ctx.Done(): | ||
_, ok = <-ctx.Done() | ||
case <-time.After(time.Second): | ||
} | ||
require.False(t, ok, "We expect the context to be cancelled") | ||
require.True(t, 1 <= count.Load() && count.Load() <= 2, "We expect count to be between 1 and 2") | ||
} | ||
|
||
func TestEagerGroupWithNoLimit(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
g, ctx := NewEagerGroup(ctx, 0) | ||
funcCounter := &atomic.Int64{} | ||
|
||
go func() { | ||
for { | ||
if funcCounter.Load() > 10 { | ||
cancel() | ||
return | ||
} | ||
} | ||
}() | ||
|
||
for i := 0; i < 10000; i++ { | ||
g.Go(func() error { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
} | ||
funcCounter.Add(1) | ||
return nil | ||
}) | ||
} | ||
require.ErrorIs(t, g.Wait(), ctx.Err(), "We expect group.Wait() to return the context error") | ||
_, ok := <-ctx.Done() | ||
require.False(t, ok, "We expect the context to be cancelled") | ||
t.Log(funcCounter.Load(), "funcs executed") | ||
// We expect between 10 and 10000 funcs to be executed | ||
// because group tries to return early if context is cancelled | ||
require.Less( | ||
t, | ||
funcCounter.Load(), | ||
int64(10000), | ||
"Expected less than 1000 funcs to be executed", | ||
) | ||
} | ||
|
||
func TestNoInitEagerGroup(t *testing.T) { | ||
g := &EagerGroup{} | ||
f := func() error { return nil } | ||
require.Panics( | ||
t, | ||
func() { g.Go(f) }, | ||
"We expect a panic when calling Go on a group that has not been initialized with NewEagerGroup", | ||
) | ||
} |