Skip to content

Commit

Permalink
fix: broken tx retries for cluster clients after #697
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Dec 23, 2024
1 parent 3b9473a commit f61ee99
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 32 deletions.
99 changes: 69 additions & 30 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,8 @@ func (c *clusterClient) toReplica(cmd Completed) bool {
return false
}

func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init bool) {
last := cmds.InitSlot
init := false

for _, cmd := range multi {
if cmd.Slot() == cmds.InitSlot {
Expand All @@ -550,7 +549,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
cc = c.pslots[cmd.Slot()]
}
if cc == nil {
return nil
return nil, false
}
count.m[cc]++
}
Expand All @@ -569,13 +568,13 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
cc = c.pslots[cmd.Slot()]
}
if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
return nil
return nil, false
}
re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}
return retries
return retries, init
}

inits := 0
Expand All @@ -589,25 +588,28 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
} else if init && last != cmd.Slot() {
panic(panicMixCxSlot)
}
p := c.pslots[cmd.Slot()]
if p == nil {
return nil
cc := c.pslots[cmd.Slot()]
if cc == nil {
return nil, false
}
count.m[p]++
count.m[cc]++
}

if last == cmds.InitSlot {
// if all commands have no slots, such as INFO, we pick a non-nil slot.
for i, p := range c.pslots {
if p != nil {
for i, cc := range c.pslots {
if cc != nil {
last = uint16(i)
count.m[p] = inits
count.m[cc] = inits
break
}
}
if last == cmds.InitSlot {
return nil
return nil, false
}
} else if init {
cc := c.pslots[last]
count.m[cc] += inits
}

retries = connretryp.Get(len(count.m), len(count.m))
Expand All @@ -627,25 +629,34 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}
return retries
return retries, init
}

func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, error) {
conns := c._pickMulti(multi)
func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, bool, error) {
conns, hasInit := c._pickMulti(multi)
if conns == nil {
if err := c.refresh(ctx); err != nil {
return nil, err
return nil, false, err
}
if conns = c._pickMulti(multi); conns == nil {
return nil, ErrNoSlot
if conns, hasInit = c._pickMulti(multi); conns == nil {
return nil, false, ErrNoSlot
}
}
return conns, nil
return conns, hasInit, nil
}

func isMulti(cmd Completed) bool {
return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "MULTI"
}
func isExec(cmd Completed) bool {
return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "EXEC"
}

func (c *clusterClient) doresultfn(
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int,
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, hasInit bool,
) (clean bool) {
mi := -1
ei := -1
clean = true
for i, resp := range resps {
clean = clean && resp.NonRedisError() == nil
Expand All @@ -664,6 +675,37 @@ func (c *clusterClient) doresultfn(
} else {
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
}
if hasInit && ei < i { // find out if there is a transaction block or not.
for mi = i; mi >= 0 && !isMulti(commands[mi]) && !isExec(commands[mi]); mi-- {
}
for ei = i; ei < len(commands) && !isMulti(commands[ei]) && !isExec(commands[ei]); ei++ {
}
if mi >= 0 && mi < ei && ei < len(commands) && isMulti(commands[mi]) && isExec(commands[ei]) && resps[mi].val.string == "QUEUED" { // a transaction is found.
mu.Lock()
retries.Redirects++
nr := retries.m[nc]
if nr == nil {
nr = retryp.Get(0, len(commands))
retries.m[nc] = nr
}
for i := mi; i <= ei; i++ {
ii := cIndexes[i]
cm := commands[i]
if mode == RedirectAsk {
nr.aIndexes = append(nr.aIndexes, ii)
nr.cAskings = append(nr.cAskings, cm)
} else {
nr.cIndexes = append(nr.cIndexes, ii)
nr.commands = append(nr.commands, cm)
}
}
mu.Unlock()
continue // the transaction has been added to the retries, go to the next cmd.
}
}
if hasInit && mi < i && i < ei && mi >= 0 && ei < len(commands) && isMulti(commands[mi]) && isExec(commands[ei]) {
continue // the current cmd is in the processed transaction and has been added to the retries.
}
mu.Lock()
if mode != RedirectRetry {
retries.Redirects++
Expand All @@ -690,17 +732,17 @@ func (c *clusterClient) doresultfn(
}

func (c *clusterClient) doretry(
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, hasInit bool,
) {
clean := true
if len(re.commands) != 0 {
resps := cc.DoMulti(ctx, re.commands...)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit)
resultsp.Put(resps)
}
if len(re.cAskings) != 0 {
resps := askingMulti(cc, ctx, re.cAskings)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean
resultsp.Put(resps)
}
if clean {
Expand All @@ -714,7 +756,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
return nil
}

retries, err := c.pickMulti(ctx, multi)
retries, hasInit, err := c.pickMulti(ctx, multi)
if err != nil {
return fillErrs(len(multi), err)
}
Expand Down Expand Up @@ -742,18 +784,17 @@ retry:
}
for cc, re := range retries.m {
delete(retries.m, cc)
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts)
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts, hasInit)
}
mu.Unlock()
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts, hasInit)
wg.Wait()

if len(retries.m) != 0 {
if retries.Redirects > 0 {
retries.Redirects = 0
goto retry
}

if retries.RetryDelay >= 0 {
c.retryHandler.WaitForRetry(ctx, retries.RetryDelay)
attempts++
Expand Down Expand Up @@ -946,7 +987,6 @@ func (c *clusterClient) resultcachefn(
if !c.retry {
continue
}

retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error())
} else {
nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode)
Expand Down Expand Up @@ -1040,7 +1080,6 @@ retry:
retries.Redirects = 0
goto retry
}

if retries.RetryDelay >= 0 {
c.retryHandler.WaitForRetry(ctx, retries.RetryDelay)
attempts++
Expand Down
4 changes: 2 additions & 2 deletions syncp.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func (r *conncount) ResetLen(n int) {
type connretry struct {
m map[conn]*retry
n int
RetryDelay time.Duration // NOTE: This is not thread-safe.
Redirects uint32 // NOTE: This is not thread-safe.
RetryDelay time.Duration // NOTE: It is not thread-safe.
}

func (r *connretry) Capacity() int {
Expand All @@ -238,8 +238,8 @@ func (r *connretry) ResetLen(n int) {
type connretrycache struct {
m map[conn]*retrycache
n int
RetryDelay time.Duration // NOTE: This is not thread-safe.
Redirects uint32 // NOTE: This is not thread-safe.
RetryDelay time.Duration // NOTE: It is not thread-safe.
}

func (r *connretrycache) Capacity() int {
Expand Down

0 comments on commit f61ee99

Please sign in to comment.