Skip to content

Commit

Permalink
Merge #136237
Browse files Browse the repository at this point in the history
136237: kvserver: ProcessSplitAfterRightHandSideHasBeenRemoved with all lease types r=iskettaneh a=iskettaneh

This commit allows the test TestProcessSplitAfterRightHandSideHasBeenRemoved to run with all lease types.

References: #133763

Release note: None

Co-authored-by: Ibrahim Kettaneh <[email protected]>
  • Loading branch information
craig[bot] and iskettaneh committed Nov 28, 2024
2 parents 1ae7e46 + 5f1cc43 commit f2ce52f
Show file tree
Hide file tree
Showing 2 changed files with 449 additions and 379 deletions.
49 changes: 49 additions & 0 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ type testClusterPartitionedRange struct {
partitionedNodeIdx int
partitioned bool
partitionedReplicas map[roachpb.ReplicaID]bool
partitionedStores map[roachpb.StoreID]bool
}
handlers []kvserver.IncomingRaftMessageHandler
}
Expand Down Expand Up @@ -316,6 +317,7 @@ func setupPartitionedRangeWithHandlers(
pr.mu.partitionedReplicas = map[roachpb.ReplicaID]bool{
replicaID: true,
}
pr.mu.partitionedStores = map[roachpb.StoreID]bool{}
for i := range tc.Servers {
s := i
h := &unreliableRaftHandler{
Expand Down Expand Up @@ -381,6 +383,40 @@ func setupPartitionedRangeWithHandlers(
}
pr.handlers = append(pr.handlers, h)
tc.Servers[s].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(tc.Target(s).StoreID, h)

// Also partition the store in the storeliveness layer.
pr.addStore(tc.Servers[partitionedNodeIdx].GetFirstStoreID())

shouldDropStoreLivenessMessage := func(from roachpb.StoreID, to roachpb.StoreID) bool {
pr.mu.RLock()
defer pr.mu.RUnlock()
// Drop all messages from/to partitioned stores.
return pr.mu.partitioned && (pr.mu.partitionedStores[from] || pr.mu.partitionedStores[to])
}

store, err := tc.Servers[s].GetStores().(*kvserver.Stores).
GetStore(tc.Servers[s].GetFirstStoreID())
if err != nil {
return nil, err
}

tc.Servers[s].StoreLivenessTransport().(*storeliveness.Transport).
ListenMessages(store.StoreID(), &storeliveness.UnreliableHandler{
MessageHandler: store.TestingStoreLivenessSupportManager(),
UnreliableHandlerFuncs: storeliveness.UnreliableHandlerFuncs{
DropStoreLivenessMsg: func(msg *storelivenesspb.Message) bool {
drop := shouldDropStoreLivenessMessage(msg.From.StoreID, msg.To.StoreID)
if drop {
log.Infof(context.Background(), "dropping StoreLiveness msg %s from store %d: to %d",
msg.Type, msg.From.StoreID, msg.To.StoreID)
} else {
log.Infof(context.Background(), "allowing StoreLiveness msg %s from store %d: to %d",
msg.Type, msg.From.StoreID, msg.To.StoreID)
}
return drop
},
},
})
}
return pr, nil
}
Expand All @@ -399,6 +435,19 @@ func (pr *testClusterPartitionedRange) addReplica(replicaID roachpb.ReplicaID) {
pr.mu.partitionedReplicas[replicaID] = true
}

func (pr *testClusterPartitionedRange) addStore(storeID roachpb.StoreID) {
pr.mu.Lock()
defer pr.mu.Unlock()
pr.mu.partitionedStores[storeID] = true
}

func (pr *testClusterPartitionedRange) removeStore(storeID roachpb.StoreID) {
pr.mu.Lock()
defer pr.mu.Unlock()

pr.mu.partitionedStores[storeID] = false
}

func (pr *testClusterPartitionedRange) extend(
tc *testcluster.TestCluster,
rangeID roachpb.RangeID,
Expand Down
Loading

0 comments on commit f2ce52f

Please sign in to comment.