From f90bf4845a6c52587cc2784ad4afaa44b777d9e5 Mon Sep 17 00:00:00 2001 From: proost Date: Tue, 31 Oct 2023 19:53:08 +0900 Subject: [PATCH 1/2] refacdtor: distribute slots to replicas --- cluster.go | 14 +++-- cluster_test.go | 142 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 134 insertions(+), 22 deletions(-) diff --git a/cluster.go b/cluster.go index 48d41268..f6e95368 100644 --- a/cluster.go +++ b/cluster.go @@ -233,13 +233,17 @@ func (c *clusterClient) _refresh() (err error) { slots := [16384]conn{} for master, g := range groups { - addr := master - if c.opt.ReplicaOnly && len(g.nodes) > 1 { - addr = g.nodes[1+rand.Intn(len(g.nodes)-1)] - } - cc := conns[addr] + nodesCount := len(g.nodes) for _, slot := range g.slots { for i := slot[0]; i <= slot[1]; i++ { + var cc conn + if c.opt.ReplicaOnly && nodesCount > 1 { + addr := g.nodes[1+rand.Intn(nodesCount-1)] + cc = conns[addr] + } else { + cc = conns[master] + } + slots[i] = cc } } diff --git a/cluster_test.go b/cluster_test.go index 29b7dee2..0da1515d 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -63,6 +63,57 @@ var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ }}, }}, nil) +var slotsMultiRespWithMultiReplicas = newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '*', values: []RedisMessage{ + {typ: ':', integer: 0}, + {typ: ':', integer: 8192}, + {typ: '*', values: []RedisMessage{ // master + {typ: '+', string: "127.0.0.1"}, + {typ: ':', integer: 0}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica1 + {typ: '+', string: "127.0.0.2"}, + {typ: ':', integer: 1}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica2 + {typ: '+', string: "127.0.0.3"}, + {typ: ':', integer: 2}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica3 + {typ: '+', string: "127.0.0.4"}, + {typ: ':', integer: 3}, + {typ: '+', string: ""}, + }}, + }}, + {typ: '*', values: []RedisMessage{ + {typ: ':', integer: 8193}, + {typ: ':', integer: 16383}, + {typ: '*', values: []RedisMessage{ // master + {typ: '+', string: "127.0.1.1"}, + {typ: ':', integer: 0}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica1 + {typ: '+', string: "127.0.1.2"}, + {typ: ':', integer: 1}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica2 + {typ: '+', string: "127.0.1.3"}, + {typ: ':', integer: 2}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica3 + {typ: '+', string: "127.0.1.4"}, + {typ: ':', integer: 3}, + {typ: '+', string: ""}, + }}, + }}, +}}, nil) + var singleSlotResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ {typ: '*', values: []RedisMessage{ {typ: ':', integer: 0}, @@ -2273,25 +2324,82 @@ func TestClusterClientReplicaOnly_PickReplica(t *testing.T) { func TestClusterClientReplicaOnly_PickMasterIfNoReplica(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) - m := &mockConn{ - DoFn: func(cmd Completed) RedisResult { - if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { - return singleSlotResp - } - return RedisResult{} - }, - } + t.Run("replicas should be picked", func(t *testing.T) { + m := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiResp + } + return RedisResult{} + }, + } - client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { - copiedM := *m - return &copiedM + client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { + copiedM := *m + return &copiedM + }) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + if client.slots[0] != client.conns["127.0.1.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 0") + } + if client.slots[8192] != client.conns["127.0.1.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 8192") + } + if client.slots[8193] != client.conns["127.0.3.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 8193") + } + if client.slots[16383] != client.conns["127.0.3.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 16383") + } }) - if err != nil { - t.Fatalf("unexpected err %v", err) - } - t.Run("replicas should be picked", func(t *testing.T) { - if client.slots[0] != client.conns["127.0.0.1:0"] { - t.Fatalf("unexpected node assigned to slot 0") + + t.Run("distributed to replicas", func(t *testing.T) { + m := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiRespWithMultiReplicas + } + return RedisResult{} + }, + } + + client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { + copiedM := *m + return &copiedM + }) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + for slot := 0; slot < 8193; slot++ { + if client.slots[slot] == client.conns["127.0.0.2:1"] { + continue + } + if client.slots[slot] == client.conns["127.0.0.3:2"] { + continue + } + if client.slots[slot] == client.conns["127.0.0.4:3"] { + continue + } + + t.Fatalf("unexpected replica node assigned to slot %d", slot) + } + + for slot := 8193; slot < 16384; slot++ { + if client.slots[slot] == client.conns["127.0.1.2:1"] { + continue + } + if client.slots[slot] == client.conns["127.0.1.3:2"] { + continue + } + if client.slots[slot] == client.conns["127.0.1.4:3"] { + continue + } + + t.Fatalf("unexpected replica node assigned to slot %d", slot) } }) } From 29c152a60557313143ce5274611f263376381eec Mon Sep 17 00:00:00 2001 From: proost Date: Wed, 1 Nov 2023 16:29:35 +0900 Subject: [PATCH 2/2] style: change if-else branch --- cluster.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/cluster.go b/cluster.go index f6e95368..772b5864 100644 --- a/cluster.go +++ b/cluster.go @@ -233,18 +233,18 @@ func (c *clusterClient) _refresh() (err error) { slots := [16384]conn{} for master, g := range groups { - nodesCount := len(g.nodes) - for _, slot := range g.slots { - for i := slot[0]; i <= slot[1]; i++ { - var cc conn - if c.opt.ReplicaOnly && nodesCount > 1 { - addr := g.nodes[1+rand.Intn(nodesCount-1)] - cc = conns[addr] - } else { - cc = conns[master] + if c.opt.ReplicaOnly && len(g.nodes) > 1 { + nodesCount := len(g.nodes) + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + slots[i] = conns[g.nodes[1+rand.Intn(nodesCount-1)]] + } + } + } else { + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + slots[i] = conns[master] } - - slots[i] = cc } } }