Skip to content

Commit

Permalink
Merge #134776
Browse files Browse the repository at this point in the history
134776: kvclient: prevent MuxRangeFeed call from hanging r=stevendanna a=stevendanna

The rangefeedMuxer holds a cache of `muxStreamOrError`. If the initial MuxRangeFeed call fails, the error is cached rather than a working client. That failure would be cached forever, and future attempts to initiate a rangefeed on that particular node would then return the cached error.

If all replicas for a particular range failed in this way, the rangefeed would hang forever, continuously "retrying" but only ever encountering the cached error.

Here, we remove items from the cache on error.

Epic: none
Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Nov 27, 2024
2 parents 97965d4 + 2f133a7 commit b77653f
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(

mux, err := client.MuxRangeFeed(ctx)
if err != nil {
// Remove the mux client from the cache if it hit an
// error.
m.muxClients.Delete(nodeID)
return future.MustSet(stream, muxStreamOrError{err: err})
}

Expand Down
110 changes: 109 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -300,6 +302,112 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
channelWaitWithTimeout(t, enoughErrors)
}

// TestMuxRangeFeedDoesNotStallOnError tests that the mux rangefeed
// client does not stall forever when all replicas return errors for
// the initial MuxRangeFeed call.
func TestMuxRangeFeedDoesNotStallOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

var (
rfMethod = "/cockroach.roachpb.Internal/MuxRangeFeed"

maxErrors = 10
shouldError atomic.Bool
errCount int
)

streamInterceptor := func(target string, class rpc.ConnectionClass) grpc.StreamClientInterceptor {
return func(
ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
method string, streamer grpc.Streamer, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
if method == rfMethod {
if shouldError.Load() && errCount <= maxErrors {
errCount++
return nil, errors.Newf("test error %d", errCount)
}
}
return streamer(ctx, desc, cc, method, opts...)
}
}
const numServers int = 3
serverArgs := base.TestServerArgs{
RetryOptions: retry.Options{
InitialBackoff: 10 * time.Millisecond,
MaxBackoff: 10 * time.Millisecond,
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ContextTestingKnobs: rpc.ContextTestingKnobs{
StreamClientInterceptor: streamInterceptor,
},
},
},
}

tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ServerArgs: serverArgs})
defer tc.Stopper().Stop(ctx)
ts := tc.ApplicationLayer(0)

startFrom := ts.Clock().Now()

sqlDB := sqlutils.MakeSQLRunner(ts.SQLConn(t))

// The goal here is to try make sure that the rangefeed is forced to
// connect to a remote node. Local connections don't go through the
// server interceptors set up above.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
)

// Waiting for the initial range to go start removing replicas makes the wait
// below substantially shorter.
testutils.SucceedsSoon(t, func() error {
var replicaCount int
sqlDB.QueryRow(t,
"SELECT sum(cardinality(replicas)) FROM [SHOW RANGES FROM TABLE foo WITH DETAILS]").
Scan(&replicaCount)
if replicaCount < 3 {
return errors.Newf("too many replicas: %d", replicaCount)
}
return nil
})

sqlDB.ExecMultiple(t,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 100)`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(10, 90, 10))`,
`ALTER TABLE foo SCATTER`,
)

// We scatter and wait until we have at least one range without replicas on n1.
testutils.SucceedsSoon(t, func() error {
var nonLocalCount int
sqlDB.QueryRow(t,
"SELECT count(1) FROM [SHOW RANGES FROM TABLE foo WITH DETAILS] WHERE array_position(replicas, 1) IS NULL").
Scan(&nonLocalCount)
if nonLocalCount <= 1 {
return errors.New("at least one non-local range required for test")
}
return nil
})

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
ts.DB(), ts.Codec(), "defaultdb", "foo")
fooSpan := fooDesc.PrimaryIndexSpan(ts.Codec())

shouldError.Store(true)
allSeen, onValue := observeNValues(100)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startFrom, onValue)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
}

// Test to make sure the various metrics used by rangefeed are correctly
// updated during the lifetime of the rangefeed and when the rangefeed completes.
func TestRangeFeedMetricsManagement(t *testing.T) {
Expand All @@ -319,7 +427,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
// Insert 1000 rows, and split them into 10 ranges.
const numRanges = 10
sqlDB.ExecMultiple(t,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`,
Expand Down

0 comments on commit b77653f

Please sign in to comment.