-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_retry.go
142 lines (122 loc) · 3.36 KB
/
async_retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package asyncretry
import (
"context"
"fmt"
"sync"
"github.com/avast/retry-go/v4"
)
type RetryableFunc func(ctx context.Context) error
type FinishFunc func(error)
type AsyncRetry interface {
// Do calls f in a new goroutine, and retry if necessary. When finished, `finish` is called regardless of success or failure exactly once.
// Non-nil error is always `ErrInShutdown` that will be returned when AsyncRetry is in shutdown.
Do(ctx context.Context, f RetryableFunc, finish FinishFunc, opts ...Option) error
// Shutdown gracefully shuts down AsyncRetry without interrupting any active `Do`.
// Shutdown works by first stopping to accept new `Do` request, and then waiting for all active `Do`'s background goroutines to be finished.
// Multiple call of Shutdown is OK.
Shutdown(ctx context.Context) error
}
type asyncRetry struct {
mu sync.RWMutex // guards wg and shutdownChan
wg sync.WaitGroup
shutdownChan chan struct{}
}
func NewAsyncRetry() AsyncRetry {
return &asyncRetry{
wg: sync.WaitGroup{},
shutdownChan: make(chan struct{}),
}
}
var ErrInShutdown = fmt.Errorf("AsyncRetry is in shutdown")
func (a *asyncRetry) Do(ctx context.Context, f RetryableFunc, finish FinishFunc, opts ...Option) error {
config := DefaultConfig
for _, opt := range opts {
opt(&config)
}
a.mu.RLock()
select {
case <-a.shutdownChan:
a.mu.RUnlock()
return ErrInShutdown
default:
}
a.wg.Add(1) // notice that this line should be in lock so that shutdown would not go ahead
a.mu.RUnlock()
go func() {
defer a.wg.Done() // Done should be called after `finish` returns
defer func() {
if recovered := recover(); recovered != nil {
var err = fmt.Errorf("panicking while AsyncRetry err: %v", recovered)
finish(err)
}
}()
var err = a.call(ctx, f, &config)
finish(err)
}()
return nil
}
func (a *asyncRetry) call(ctx context.Context, f RetryableFunc, config *Config) error {
ctx, cancel := context.WithCancel(WithoutCancel(ctx))
defer cancel()
noMoreRetryCtx, noMoreRetry := context.WithCancel(config.context)
defer noMoreRetry()
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-a.shutdownChan:
noMoreRetry()
if config.cancelWhenShutdown {
cancel()
}
case <-config.context.Done():
if config.cancelWhenConfigContextCanceled {
cancel()
}
case <-done: // release resources
}
}()
return retry.Do(
func() error {
if config.timeout > 0 {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, config.timeout)
defer timeoutCancel()
return f(timeoutCtx)
}
return f(ctx)
},
retry.Attempts(config.attempts),
retry.OnRetry(retry.OnRetryFunc(config.onRetry)),
retry.RetryIf(retry.RetryIfFunc(config.retryIf)),
retry.Context(noMoreRetryCtx),
retry.Delay(config.delay),
retry.MaxJitter(config.maxJitter),
)
}
func (a *asyncRetry) Shutdown(ctx context.Context) error {
a.mu.Lock()
select {
case <-a.shutdownChan: // Already closed.
default: // Guarded by a.mu
close(a.shutdownChan)
}
a.mu.Unlock()
ch := make(chan struct{})
go func() {
a.wg.Wait()
close(ch)
}()
select {
case <-ch:
case <-ctx.Done():
}
return ctx.Err()
}
// Unrecoverable wraps error.
func Unrecoverable(err error) error {
return retry.Unrecoverable(err)
}
// IsRecoverable checks if error is recoverable
func IsRecoverable(err error) bool {
return retry.IsRecoverable(err)
}