From 96382077377ae9e25907bb36f6d09b05c5e54906 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 24 Dec 2024 07:38:37 -0500 Subject: [PATCH 1/2] sql/parser: parse CREATE LOGICALLY REPLICATED TABLE options seperately The two different CREATE LDR incancatations will require different options, so we should validate which options apply to which incantation at parsing time. Epic: none Release note (sql change): provide different options for CREATE LOGICALLY REPLICATED TABLE. --- ...create_logical_replication_stream_stmt.bnf | 2 +- docs/generated/sql/bnf/stmt_block.bnf | 20 ++++----- pkg/sql/parser/sql.y | 45 ++++++++++++++++++- .../testdata/create_logical_replication | 17 +++++++ 4 files changed, 69 insertions(+), 15 deletions(-) diff --git a/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf b/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf index cd0fce77ed59..57e8e8abccf1 100644 --- a/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf +++ b/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf @@ -1,2 +1,2 @@ create_logical_replication_stream_stmt ::= - 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_options + 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_create_table_options diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 427bc6a14698..4c147f5a915a 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -621,7 +621,7 @@ create_external_connection_stmt ::= 'CREATE' 'EXTERNAL' 'CONNECTION' label_spec 'AS' string_or_placeholder create_logical_replication_stream_stmt ::= - 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_options + 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_create_table_options create_schedule_stmt ::= create_schedule_for_changefeed_stmt @@ -1879,9 +1879,9 @@ logical_replication_resources ::= | 'TABLES' '(' logical_replication_resources_list ')' | 'DATABASE' database_name -opt_logical_replication_options ::= - 'WITH' logical_replication_options_list - | 'WITH' 'OPTIONS' '(' logical_replication_options_list ')' +opt_logical_replication_create_table_options ::= + 'WITH' logical_replication_create_table_options_list + | 'WITH' 'OPTIONS' '(' logical_replication_create_table_options_list ')' | create_schedule_for_changefeed_stmt ::= @@ -2687,8 +2687,8 @@ target_elem ::= logical_replication_resources_list ::= ( db_object_name ) ( ( ',' db_object_name ) )* -logical_replication_options_list ::= - ( logical_replication_options ) ( ( ',' logical_replication_options ) )* +logical_replication_create_table_options_list ::= + ( logical_replication_create_table_options ) ( ( ',' logical_replication_create_table_options ) )* schedule_label_spec ::= label_spec @@ -3340,13 +3340,9 @@ bare_col_label ::= 'identifier' | bare_label_keywords -logical_replication_options ::= - 'CURSOR' '=' string_or_placeholder - | 'MODE' '=' string_or_placeholder - | 'DEFAULT' 'FUNCTION' '=' string_or_placeholder - | 'FUNCTION' db_object_name 'FOR' 'TABLE' db_object_name +logical_replication_create_table_options ::= + 'MODE' '=' string_or_placeholder | 'DISCARD' '=' string_or_placeholder - | 'SKIP' 'SCHEMA' 'CHECK' | 'LABEL' '=' string_or_placeholder common_table_expr ::= diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index ff3ad84e09e7..71416e650507 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1264,7 +1264,7 @@ func (u *sqlSymUnion) indexType() tree.IndexType { %type create_policy_stmt %type logical_replication_resources, logical_replication_resources_list -%type <*tree.LogicalReplicationOptions> opt_logical_replication_options logical_replication_options logical_replication_options_list +%type <*tree.LogicalReplicationOptions> opt_logical_replication_options logical_replication_options logical_replication_options_list opt_logical_replication_create_table_options logical_replication_create_table_options logical_replication_create_table_options_list %type create_stats_stmt %type <*tree.CreateStatsOptions> opt_create_stats_options @@ -4666,7 +4666,7 @@ create_logical_replication_stream_stmt: Options: *$11.logicalReplicationOptions(), } } -| CREATE LOGICALLY REPLICATED logical_replication_resources FROM logical_replication_resources ON string_or_placeholder opt_logical_replication_options +| CREATE LOGICALLY REPLICATED logical_replication_resources FROM logical_replication_resources ON string_or_placeholder opt_logical_replication_create_table_options { $$.val = &tree.CreateLogicalReplicationStream{ Into: $4.logicalReplicationResources(), @@ -4729,6 +4729,20 @@ opt_logical_replication_options: $$.val = &tree.LogicalReplicationOptions{} } +opt_logical_replication_create_table_options: + WITH logical_replication_create_table_options_list + { + $$.val = $2.logicalReplicationOptions() + } +| WITH OPTIONS '(' logical_replication_create_table_options_list ')' + { + $$.val = $4.logicalReplicationOptions() + } +| /* EMPTY */ + { + $$.val = &tree.LogicalReplicationOptions{} + } + logical_replication_options_list: // Require at least one option logical_replication_options @@ -4742,6 +4756,19 @@ logical_replication_options_list: } } +logical_replication_create_table_options_list: + // Require at least one option + logical_replication_create_table_options + { + $$.val = $1.logicalReplicationOptions() + } +| logical_replication_create_table_options_list ',' logical_replication_create_table_options + { + if err := $1.logicalReplicationOptions().CombineWith($3.logicalReplicationOptions()); err != nil { + return setErr(sqllex, err) + } + } + // List of valid logical replication options. logical_replication_options: CURSOR '=' string_or_placeholder @@ -4774,6 +4801,20 @@ logical_replication_options: $$.val = &tree.LogicalReplicationOptions{MetricsLabel: $3.expr()} } +logical_replication_create_table_options: + MODE '=' string_or_placeholder + { + $$.val = &tree.LogicalReplicationOptions{Mode: $3.expr()} + } + | DISCARD '=' string_or_placeholder + { + $$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()} + } +| LABEL '=' string_or_placeholder + { + $$.val = &tree.LogicalReplicationOptions{MetricsLabel: $3.expr()} + } + // %Help: CREATE VIRTUAL CLUSTER - create a new virtual cluster // %Category: Experimental // %Text: diff --git a/pkg/sql/parser/testdata/create_logical_replication b/pkg/sql/parser/testdata/create_logical_replication index 34c5d1edbce5..9522a3661192 100644 --- a/pkg/sql/parser/testdata/create_logical_replication +++ b/pkg/sql/parser/testdata/create_logical_replication @@ -22,6 +22,23 @@ CREATE LOGICALLY REPLICATED TABLE (foo) FROM TABLE (foo) ON ('uri') -- fully par CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON '_' -- literals removed CREATE LOGICALLY REPLICATED TABLE _ FROM TABLE _ ON 'uri' -- identifiers removed +error +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH CURSOR = '1536242855577149065.0000000000'; +---- +at or near "cursor": syntax error +DETAIL: source SQL: +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH CURSOR = '1536242855577149065.0000000000' + ^ +HINT: try \h CREATE + +parse +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH MODE = 'immediate'; +---- +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH OPTIONS (MODE = 'immediate') -- normalized! +CREATE LOGICALLY REPLICATED TABLE (foo) FROM TABLE (foo) ON ('uri') WITH OPTIONS (MODE = ('immediate')) -- fully parenthesized +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON '_' WITH OPTIONS (MODE = '_') -- literals removed +CREATE LOGICALLY REPLICATED TABLE _ FROM TABLE _ ON 'uri' WITH OPTIONS (MODE = 'immediate') -- identifiers removed + parse CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar; ---- From b8afadd276b7fe0759ff922b4c897554276c8845 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Mon, 6 Jan 2025 12:04:52 -0500 Subject: [PATCH 2/2] rac2: fix replicaState.desc access race The ReplicaID is a constant, so the race is fixed by duplicating the ReplicaID as an immutable field. Fixes #138033 Epic: CRDB-37515 Release note: None --- .../kvflowcontrol/rac2/range_controller.go | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 6824326f5c0d..58055ce41ba7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -1214,9 +1214,9 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // In a joint config (numSets == 2), config 0 may not need a replica // to force-flush, but the replica may also be in config 1 and be // force-flushing due to that (or vice versa). This complicates - // computeVoterDirectives, so under the assumption that joint configs - // are temporary, we don't bother stopping force flushes in that joint - // configs. + // computeVoterDirectivesRaftMuLocked, so under the assumption that + // joint configs are temporary, we don't bother stopping force flushes + // in that joint configs. noChangesNeeded = false } // Else, common case. @@ -1224,7 +1224,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra if noChangesNeeded { // Common case. } else { - rc.computeVoterDirectives(votersContributingToQuorum, quorumCounts) + rc.computeVoterDirectivesRaftMuLocked(votersContributingToQuorum, quorumCounts) } } @@ -1291,7 +1291,7 @@ type replicaScore struct { } // Second-pass decision-making. -func (rc *rangeController) computeVoterDirectives( +func (rc *rangeController) computeVoterDirectivesRaftMuLocked( votersContributingToQuorum [2]int, quorumCounts [2]int, ) { var scratchFFScores, scratchCandidateFFScores, scratchDenySendQScores [5]replicaScore @@ -1335,7 +1335,7 @@ func (rc *rangeController) computeVoterDirectives( sendTokens := rs.sendTokenCounter.tokens(admissionpb.ElasticWorkClass) bucketedSendTokens := (sendTokens / sendPoolBucket) * sendPoolBucket score := replicaScore{ - replicaID: rs.desc.ReplicaID, + replicaID: rs.replicaID, bucketedTokensSend: bucketedSendTokens, tokensEval: rs.evalTokenCounter.tokens(admissionpb.ElasticWorkClass), } @@ -1486,7 +1486,7 @@ func (rc *rangeController) HandleSchedulerEventRaftMuLocked( rc.opts.Scheduler.ScheduleControllerEvent(rc.opts.RangeID) } for _, rs := range nextScheduled { - rc.scheduledMu.replicas[rs.desc.ReplicaID] = struct{}{} + rc.scheduledMu.replicas[rs.replicaID] = struct{}{} } }() } @@ -1756,7 +1756,7 @@ func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) { rc.lastSendQueueStatsScratch.Clear() for _, rs := range rc.replicaMap { stats := ReplicaSendQueueStats{ - ReplicaID: rs.desc.ReplicaID, + ReplicaID: rs.replicaID, } if rs.sendStream != nil { func() { @@ -1983,13 +1983,17 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { // replicaState holds state for each replica. All methods are called with // raftMu held, hence it does not have its own mutex. type replicaState struct { + // ==== Immutable fields ==== parent *rangeController // stream aggregates across the streams for the same (tenant, store). This // is the identity that is used to deduct tokens or wait for tokens to be // positive. stream kvflowcontrol.Stream evalTokenCounter, sendTokenCounter *tokenCounter - desc roachpb.ReplicaDescriptor + replicaID roachpb.ReplicaID + + // ==== Mutable fields ==== + desc roachpb.ReplicaDescriptor sendStream *replicaSendStream @@ -2063,6 +2067,7 @@ func NewReplicaState( stream: stream, evalTokenCounter: parent.opts.SSTokenCounter.Eval(stream), sendTokenCounter: parent.opts.SSTokenCounter.Send(stream), + replicaID: desc.ReplicaID, desc: desc, } // Don't bother creating the replicaSendStream here. We will do this in @@ -2415,7 +2420,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( indexToSend: rss.mu.sendQueue.indexToSend, preventSendQNoForceFlush: false, } - if rs.desc.ReplicaID == rs.parent.leaseholder { + if rs.replicaID == rs.parent.leaseholder { if vss.noSendQ { // The first-pass itself decides that we need to send. vss.hasSendTokens = true @@ -2427,7 +2432,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( } return vss } - if rs.desc.ReplicaID == rs.parent.opts.LocalReplicaID { + if rs.replicaID == rs.parent.opts.LocalReplicaID { // Leader. vss.hasSendTokens = true return vss @@ -2574,7 +2579,7 @@ func (rs *replicaState) scheduledRaftMuLocked( ctx context.Context, mode RaftMsgAppMode, logSnapshot raft.LogSnapshot, ) (scheduleAgain bool, updateWaiterSets bool) { rs.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() - if rs.desc.ReplicaID == rs.parent.opts.LocalReplicaID { + if rs.replicaID == rs.parent.opts.LocalReplicaID { panic("scheduled called on the leader replica") } rss := rs.sendStream @@ -2639,9 +2644,9 @@ func (rs *replicaState) scheduledRaftMuLocked( if err == nil { var sent bool msg, sent = rss.parent.parent.opts.RaftInterface.SendMsgAppRaftMuLocked( - rss.parent.desc.ReplicaID, slice) + rss.parent.replicaID, slice) if !sent { - err = errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID) + err = errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.replicaID) } } if err != nil { @@ -2742,7 +2747,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( n := len(event.sendingEntries) if n != 0 { panic(errors.AssertionFailedf("pull mode must not have sending entries (leader=%t)", - rss.parent.desc.ReplicaID == rss.parent.parent.opts.LocalReplicaID)) + rss.parent.replicaID == rss.parent.parent.opts.LocalReplicaID)) } if directive.forceFlushStopIndex.active() { // Must have a send-queue, so sendingEntries should stay empty (these @@ -2755,7 +2760,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( } if wasExceedingInflightBytesThreshold && !rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() { - rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) + rss.parent.parent.scheduleReplica(rss.parent.replicaID) } } } else { @@ -2894,10 +2899,10 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( return false, err } msg, sent := rss.parent.parent.opts.RaftInterface.SendMsgAppRaftMuLocked( - rss.parent.desc.ReplicaID, slice) + rss.parent.replicaID, slice) if !sent { return false, - errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID) + errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.replicaID) } rss.updateInflightRaftMuAndStreamLocked(slice) rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, false) @@ -2984,7 +2989,7 @@ func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked( rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1) rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex if !rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() { - rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) + rss.parent.parent.scheduleReplica(rss.parent.replicaID) } rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false) } @@ -3220,7 +3225,7 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { } rss.mu.sendQueue.deductedForSchedulerTokens = tokens rss.parent.parent.opts.RangeControllerMetrics.SendQueue.DeductedForSchedulerBytes.Inc(int64(tokens)) - rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) + rss.parent.parent.scheduleReplica(rss.parent.replicaID) } // NB: raftMu may or may not be held. Specifically, when called from Notify,