forked from mediocregopher/radix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.go
475 lines (412 loc) · 11.6 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
package radix
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/groupme/radix/resp"
"github.com/groupme/radix/resp/resp2"
)
// dedupe is used to deduplicate a function invocation, so if multiple
// go-routines call it at the same time only the first will actually run it, and
// the others will block until that one is done.
type dedupe struct {
l sync.Mutex
s *sync.Once
}
func newDedupe() *dedupe {
return &dedupe{s: new(sync.Once)}
}
func (d *dedupe) do(fn func()) {
d.l.Lock()
s := d.s
d.l.Unlock()
s.Do(func() {
fn()
d.l.Lock()
d.s = new(sync.Once)
d.l.Unlock()
})
}
////////////////////////////////////////////////////////////////////////////////
// ClusterCanRetryAction is an Action which is aware of Cluster's retry behavior
// in the event of a slot migration. If an Action receives an error from a
// Cluster node which is either MOVED or ASK, and that Action implements
// ClusterCanRetryAction, and the ClusterCanRetry method returns true, then the
// Action will be retried on the correct node.
//
// NOTE that the Actions which are returned by Cmd, FlatCmd, and EvalScript.Cmd
// all implicitly implement this interface.
type ClusterCanRetryAction interface {
Action
ClusterCanRetry() bool
}
////////////////////////////////////////////////////////////////////////////////
type clusterOpts struct {
pf ClientFunc
syncEvery time.Duration
}
// ClusterOpt is an optional behavior which can be applied to the NewCluster
// function to effect a Cluster's behavior
type ClusterOpt func(*clusterOpts)
// ClusterPoolFunc tells the Cluster to use the given ClientFunc when creating
// pools of connections to cluster members.
func ClusterPoolFunc(pf ClientFunc) ClusterOpt {
return func(co *clusterOpts) {
co.pf = pf
}
}
// ClusterSyncEvery tells the Cluster to synchronize itself with the cluster's
// topology at the given interval. On every synchronization Cluster will ask the
// cluster for its topology and make/destroy its connections as necessary.
func ClusterSyncEvery(d time.Duration) ClusterOpt {
return func(co *clusterOpts) {
co.syncEvery = d
}
}
// Cluster contains all information about a redis cluster needed to interact
// with it, including a set of pools to each of its instances. All methods on
// Cluster are thread-safe
type Cluster struct {
co clusterOpts
// used to deduplicate calls to sync
syncDedupe *dedupe
l sync.RWMutex
pools map[string]Client
primTopo, topo ClusterTopo
closeCh chan struct{}
closeWG sync.WaitGroup
closeOnce sync.Once
// Any errors encountered internally will be written to this channel. If
// nothing is reading the channel the errors will be dropped. The channel
// will be closed when the Close is called.
ErrCh chan error
}
// NewCluster initializes and returns a Cluster instance. It will try every
// address given until it finds a usable one. From there it uses CLUSTER SLOTS
// to discover the cluster topology and make all the necessary connections.
//
// NewCluster takes in a number of options which can overwrite its default
// behavior. The default options NewCluster uses are:
//
// ClusterPoolFunc(DefaultClientFunc)
// ClusterSyncEvery(5 * time.Second)
//
func NewCluster(clusterAddrs []string, opts ...ClusterOpt) (*Cluster, error) {
c := &Cluster{
syncDedupe: newDedupe(),
pools: map[string]Client{},
closeCh: make(chan struct{}),
ErrCh: make(chan error, 1),
}
defaultClusterOpts := []ClusterOpt{
ClusterPoolFunc(DefaultClientFunc),
ClusterSyncEvery(5 * time.Second),
}
for _, opt := range append(defaultClusterOpts, opts...) {
// the other args to NewCluster used to be a ClientFunc, which someone
// might have left as nil, in which case this now gives a weird panic.
// Just handle it
if opt != nil {
opt(&(c.co))
}
}
// make a pool to base the cluster on
for _, addr := range clusterAddrs {
p, err := c.co.pf("tcp", addr)
if err != nil {
continue
}
c.pools[addr] = p
break
}
if err := c.Sync(); err != nil {
for _, p := range c.pools {
p.Close()
}
return nil, err
}
c.syncEvery(c.co.syncEvery)
return c, nil
}
func (c *Cluster) err(err error) {
select {
case c.ErrCh <- err:
default:
}
}
func assertKeysSlot(keys []string) error {
var ok bool
var prevKey string
var slot uint16
for _, key := range keys {
thisSlot := ClusterSlot([]byte(key))
if !ok {
ok = true
} else if slot != thisSlot {
return fmt.Errorf("keys %q and %q do not belong to the same slot", prevKey, key)
}
prevKey = key
slot = thisSlot
}
return nil
}
// may return nil, nil if no pool for the addr
func (c *Cluster) rpool(addr string) (Client, error) {
c.l.RLock()
defer c.l.RUnlock()
if addr == "" {
for _, p := range c.pools {
return p, nil
}
return nil, errors.New("no pools available")
} else if p, ok := c.pools[addr]; ok {
return p, nil
}
return nil, nil
}
var errUnknownAddress = errors.New("unknown address")
// Client returns a Client for the given address, which could be either the
// primary or one of the secondaries (see Topo method for retrieving known
// addresses).
//
// NOTE that if there is a failover while a Client returned by this method is
// being used the Client may or may not continue to work as expected, depending
// on the nature of the failover.
//
// NOTE the Client should _not_ be closed.
func (c *Cluster) Client(addr string) (Client, error) {
// rpool allows the address to be "", handle that case manually
if addr == "" {
return nil, errUnknownAddress
}
cl, err := c.rpool(addr)
if err != nil {
return nil, err
} else if cl == nil {
return nil, errUnknownAddress
}
return cl, nil
}
// if addr is "" returns a random pool. If addr is given but there's no pool for
// it one will be created on-the-fly
func (c *Cluster) pool(addr string) (Client, error) {
p, err := c.rpool(addr)
if p != nil || err != nil {
return p, err
}
// if the pool isn't available make it on-the-fly. This behavior isn't
// _great_, but theoretically the syncEvery process should clean up any
// extraneous pools which aren't really needed
// it's important that the cluster pool set isn't locked while this is
// happening, because this could block for a while
if p, err = c.co.pf("tcp", addr); err != nil {
return nil, err
}
// we've made a new pool, but we need to double-check someone else didn't
// make one at the same time and add it in first. If they did, close this
// one and return that one
c.l.Lock()
if p2, ok := c.pools[addr]; ok {
c.l.Unlock()
p.Close()
return p2, nil
}
c.pools[addr] = p
c.l.Unlock()
return p, nil
}
// Topo returns the Cluster's topology as it currently knows it. See
// ClusterTopo's docs for more on its default order.
func (c *Cluster) Topo() ClusterTopo {
c.l.RLock()
defer c.l.RUnlock()
return c.topo
}
func (c *Cluster) getTopo(p Client) (ClusterTopo, error) {
var tt ClusterTopo
err := p.Do(Cmd(&tt, "CLUSTER", "SLOTS"))
return tt, err
}
// Sync will synchronize the Cluster with the actual cluster, making new pools
// to new instances and removing ones from instances no longer in the cluster.
// This will be called periodically automatically, but you can manually call it
// at any time as well
func (c *Cluster) Sync() error {
p, err := c.pool("")
if err != nil {
return err
}
c.syncDedupe.do(func() {
err = c.sync(p)
})
return err
}
// while this method is normally deduplicated by the Sync method's use of
// dedupe it is perfectly thread-safe on its own and can be used whenever.
func (c *Cluster) sync(p Client) error {
tt, err := c.getTopo(p)
if err != nil {
return err
}
for _, t := range tt {
// call pool just to ensure one exists for this addr
if _, err := c.pool(t.Addr); err != nil {
return fmt.Errorf("error connecting to %s: %s", t.Addr, err)
}
}
// this is a big bit of code to totally lockdown the cluster for, but at the
// same time Close _shouldn't_ block significantly
c.l.Lock()
defer c.l.Unlock()
c.topo = tt
c.primTopo = tt.Primaries()
tm := tt.Map()
for addr, p := range c.pools {
if _, ok := tm[addr]; !ok {
p.Close()
delete(c.pools, addr)
}
}
return nil
}
func (c *Cluster) syncEvery(d time.Duration) {
c.closeWG.Add(1)
go func() {
defer c.closeWG.Done()
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-t.C:
if err := c.Sync(); err != nil {
c.err(err)
}
case <-c.closeCh:
return
}
}
}()
}
func (c *Cluster) addrForKey(key string) string {
s := ClusterSlot([]byte(key))
c.l.RLock()
defer c.l.RUnlock()
for _, t := range c.primTopo {
for _, slot := range t.Slots {
if s >= slot[0] && s < slot[1] {
return t.Addr
}
}
}
return ""
}
type askConn struct {
Conn
}
func (ac askConn) Encode(m resp.Marshaler) error {
if err := ac.Conn.Encode(Cmd(nil, "ASKING")); err != nil {
return err
}
return ac.Conn.Encode(m)
}
func (ac askConn) Decode(um resp.Unmarshaler) error {
if err := ac.Conn.Decode(resp2.Any{}); err != nil {
return err
}
return ac.Conn.Decode(um)
}
func (ac askConn) Do(a Action) error {
return a.Run(ac)
}
const doAttempts = 5
// Do performs an Action on a redis instance in the cluster, with the instance
// being determeined by the key returned from the Action's Key() method.
//
// This method handles MOVED and ASK errors automatically in most cases, see
// ClusterCanRetryAction's docs for more.
func (c *Cluster) Do(a Action) error {
var addr, key string
keys := a.Keys()
if len(keys) == 0 {
// that's ok, key will then just be ""
} else if err := assertKeysSlot(keys); err != nil {
return err
} else {
key = keys[0]
addr = c.addrForKey(key)
}
return c.doInner(a, addr, key, false, doAttempts)
}
func (c *Cluster) doInner(a Action, addr, key string, ask bool, attempts int) error {
p, err := c.pool(addr)
if err != nil {
return err
}
// We only need to use WithConn if we want to send an ASKING command before
// our Action a. If ask is false we can thus skip the WithConn call, which
// avoids a few allocations, and execute our Action directly on p. This
// helps with most calls since ask will only be true when a key gets
// migrated between nodes.
thisA := a
if ask {
thisA = WithConn(key, func(conn Conn) error {
return askConn{conn}.Do(a)
})
}
if err = p.Do(thisA); err == nil {
return nil
}
// if the error was a MOVED or ASK we can potentially retry
msg := err.Error()
moved := strings.HasPrefix(msg, "MOVED ")
ask = strings.HasPrefix(msg, "ASK ")
if !moved && !ask {
return err
}
// if we get an ASK there's no need to do a sync quite yet, we can continue
// normally. But MOVED always prompts a sync. In the section after this one
// we figure out what address to use based on the returned error so the sync
// isn't used _immediately_, but it still needs to happen.
//
// Also, even if the Action isn't a ClusterCanRetryAction we want a MOVED to
// prompt a Sync
if moved {
if serr := c.Sync(); serr != nil {
return serr
}
}
if ccra, ok := a.(ClusterCanRetryAction); !ok || !ccra.ClusterCanRetry() {
return err
}
msgParts := strings.Split(msg, " ")
if len(msgParts) < 3 {
return fmt.Errorf("malformed MOVED/ASK error %q", msg)
}
addr = msgParts[2]
if attempts--; attempts <= 0 {
return errors.New("cluster action redirected too many times")
}
return c.doInner(a, addr, key, ask, attempts)
}
// Close cleans up all goroutines spawned by Cluster and closes all of its
// Pools.
func (c *Cluster) Close() error {
closeErr := errClientClosed
c.closeOnce.Do(func() {
close(c.closeCh)
c.closeWG.Wait()
close(c.ErrCh)
c.l.Lock()
defer c.l.Unlock()
var pErr error
for _, p := range c.pools {
if err := p.Close(); pErr == nil && err != nil {
pErr = err
}
}
closeErr = pErr
})
return closeErr
}