diff --git a/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf b/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf index cd0fce77ed59..57e8e8abccf1 100644 --- a/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf +++ b/docs/generated/sql/bnf/create_logical_replication_stream_stmt.bnf @@ -1,2 +1,2 @@ create_logical_replication_stream_stmt ::= - 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_options + 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_create_table_options diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index d3b57175cf91..2ccb01d8b996 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -622,7 +622,7 @@ create_external_connection_stmt ::= 'CREATE' 'EXTERNAL' 'CONNECTION' label_spec 'AS' string_or_placeholder create_logical_replication_stream_stmt ::= - 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_options + 'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_create_table_options create_schedule_stmt ::= create_schedule_for_changefeed_stmt @@ -1883,9 +1883,9 @@ logical_replication_resources ::= | 'TABLES' '(' logical_replication_resources_list ')' | 'DATABASE' database_name -opt_logical_replication_options ::= - 'WITH' logical_replication_options_list - | 'WITH' 'OPTIONS' '(' logical_replication_options_list ')' +opt_logical_replication_create_table_options ::= + 'WITH' logical_replication_create_table_options_list + | 'WITH' 'OPTIONS' '(' logical_replication_create_table_options_list ')' | create_schedule_for_changefeed_stmt ::= @@ -2691,8 +2691,8 @@ target_elem ::= logical_replication_resources_list ::= ( db_object_name ) ( ( ',' db_object_name ) )* -logical_replication_options_list ::= - ( logical_replication_options ) ( ( ',' logical_replication_options ) )* +logical_replication_create_table_options_list ::= + ( logical_replication_create_table_options ) ( ( ',' logical_replication_create_table_options ) )* schedule_label_spec ::= label_spec @@ -3344,13 +3344,9 @@ bare_col_label ::= 'identifier' | bare_label_keywords -logical_replication_options ::= - 'CURSOR' '=' string_or_placeholder - | 'MODE' '=' string_or_placeholder - | 'DEFAULT' 'FUNCTION' '=' string_or_placeholder - | 'FUNCTION' db_object_name 'FOR' 'TABLE' db_object_name +logical_replication_create_table_options ::= + 'MODE' '=' string_or_placeholder | 'DISCARD' '=' string_or_placeholder - | 'SKIP' 'SCHEMA' 'CHECK' | 'LABEL' '=' string_or_placeholder common_table_expr ::= diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 6824326f5c0d..58055ce41ba7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -1214,9 +1214,9 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // In a joint config (numSets == 2), config 0 may not need a replica // to force-flush, but the replica may also be in config 1 and be // force-flushing due to that (or vice versa). This complicates - // computeVoterDirectives, so under the assumption that joint configs - // are temporary, we don't bother stopping force flushes in that joint - // configs. + // computeVoterDirectivesRaftMuLocked, so under the assumption that + // joint configs are temporary, we don't bother stopping force flushes + // in that joint configs. noChangesNeeded = false } // Else, common case. @@ -1224,7 +1224,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra if noChangesNeeded { // Common case. } else { - rc.computeVoterDirectives(votersContributingToQuorum, quorumCounts) + rc.computeVoterDirectivesRaftMuLocked(votersContributingToQuorum, quorumCounts) } } @@ -1291,7 +1291,7 @@ type replicaScore struct { } // Second-pass decision-making. -func (rc *rangeController) computeVoterDirectives( +func (rc *rangeController) computeVoterDirectivesRaftMuLocked( votersContributingToQuorum [2]int, quorumCounts [2]int, ) { var scratchFFScores, scratchCandidateFFScores, scratchDenySendQScores [5]replicaScore @@ -1335,7 +1335,7 @@ func (rc *rangeController) computeVoterDirectives( sendTokens := rs.sendTokenCounter.tokens(admissionpb.ElasticWorkClass) bucketedSendTokens := (sendTokens / sendPoolBucket) * sendPoolBucket score := replicaScore{ - replicaID: rs.desc.ReplicaID, + replicaID: rs.replicaID, bucketedTokensSend: bucketedSendTokens, tokensEval: rs.evalTokenCounter.tokens(admissionpb.ElasticWorkClass), } @@ -1486,7 +1486,7 @@ func (rc *rangeController) HandleSchedulerEventRaftMuLocked( rc.opts.Scheduler.ScheduleControllerEvent(rc.opts.RangeID) } for _, rs := range nextScheduled { - rc.scheduledMu.replicas[rs.desc.ReplicaID] = struct{}{} + rc.scheduledMu.replicas[rs.replicaID] = struct{}{} } }() } @@ -1756,7 +1756,7 @@ func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) { rc.lastSendQueueStatsScratch.Clear() for _, rs := range rc.replicaMap { stats := ReplicaSendQueueStats{ - ReplicaID: rs.desc.ReplicaID, + ReplicaID: rs.replicaID, } if rs.sendStream != nil { func() { @@ -1983,13 +1983,17 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { // replicaState holds state for each replica. All methods are called with // raftMu held, hence it does not have its own mutex. type replicaState struct { + // ==== Immutable fields ==== parent *rangeController // stream aggregates across the streams for the same (tenant, store). This // is the identity that is used to deduct tokens or wait for tokens to be // positive. stream kvflowcontrol.Stream evalTokenCounter, sendTokenCounter *tokenCounter - desc roachpb.ReplicaDescriptor + replicaID roachpb.ReplicaID + + // ==== Mutable fields ==== + desc roachpb.ReplicaDescriptor sendStream *replicaSendStream @@ -2063,6 +2067,7 @@ func NewReplicaState( stream: stream, evalTokenCounter: parent.opts.SSTokenCounter.Eval(stream), sendTokenCounter: parent.opts.SSTokenCounter.Send(stream), + replicaID: desc.ReplicaID, desc: desc, } // Don't bother creating the replicaSendStream here. We will do this in @@ -2415,7 +2420,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( indexToSend: rss.mu.sendQueue.indexToSend, preventSendQNoForceFlush: false, } - if rs.desc.ReplicaID == rs.parent.leaseholder { + if rs.replicaID == rs.parent.leaseholder { if vss.noSendQ { // The first-pass itself decides that we need to send. vss.hasSendTokens = true @@ -2427,7 +2432,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( } return vss } - if rs.desc.ReplicaID == rs.parent.opts.LocalReplicaID { + if rs.replicaID == rs.parent.opts.LocalReplicaID { // Leader. vss.hasSendTokens = true return vss @@ -2574,7 +2579,7 @@ func (rs *replicaState) scheduledRaftMuLocked( ctx context.Context, mode RaftMsgAppMode, logSnapshot raft.LogSnapshot, ) (scheduleAgain bool, updateWaiterSets bool) { rs.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() - if rs.desc.ReplicaID == rs.parent.opts.LocalReplicaID { + if rs.replicaID == rs.parent.opts.LocalReplicaID { panic("scheduled called on the leader replica") } rss := rs.sendStream @@ -2639,9 +2644,9 @@ func (rs *replicaState) scheduledRaftMuLocked( if err == nil { var sent bool msg, sent = rss.parent.parent.opts.RaftInterface.SendMsgAppRaftMuLocked( - rss.parent.desc.ReplicaID, slice) + rss.parent.replicaID, slice) if !sent { - err = errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID) + err = errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.replicaID) } } if err != nil { @@ -2742,7 +2747,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( n := len(event.sendingEntries) if n != 0 { panic(errors.AssertionFailedf("pull mode must not have sending entries (leader=%t)", - rss.parent.desc.ReplicaID == rss.parent.parent.opts.LocalReplicaID)) + rss.parent.replicaID == rss.parent.parent.opts.LocalReplicaID)) } if directive.forceFlushStopIndex.active() { // Must have a send-queue, so sendingEntries should stay empty (these @@ -2755,7 +2760,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( } if wasExceedingInflightBytesThreshold && !rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() { - rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) + rss.parent.parent.scheduleReplica(rss.parent.replicaID) } } } else { @@ -2894,10 +2899,10 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( return false, err } msg, sent := rss.parent.parent.opts.RaftInterface.SendMsgAppRaftMuLocked( - rss.parent.desc.ReplicaID, slice) + rss.parent.replicaID, slice) if !sent { return false, - errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID) + errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.replicaID) } rss.updateInflightRaftMuAndStreamLocked(slice) rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, false) @@ -2984,7 +2989,7 @@ func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked( rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1) rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex if !rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() { - rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) + rss.parent.parent.scheduleReplica(rss.parent.replicaID) } rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false) } @@ -3220,7 +3225,7 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { } rss.mu.sendQueue.deductedForSchedulerTokens = tokens rss.parent.parent.opts.RangeControllerMetrics.SendQueue.DeductedForSchedulerBytes.Inc(int64(tokens)) - rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) + rss.parent.parent.scheduleReplica(rss.parent.replicaID) } // NB: raftMu may or may not be held. Specifically, when called from Notify, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index ff5f3900edbc..93cc2ec5c619 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1264,7 +1264,7 @@ func (u *sqlSymUnion) indexType() tree.IndexType { %type create_policy_stmt %type logical_replication_resources, logical_replication_resources_list -%type <*tree.LogicalReplicationOptions> opt_logical_replication_options logical_replication_options logical_replication_options_list +%type <*tree.LogicalReplicationOptions> opt_logical_replication_options logical_replication_options logical_replication_options_list opt_logical_replication_create_table_options logical_replication_create_table_options logical_replication_create_table_options_list %type create_stats_stmt %type <*tree.CreateStatsOptions> opt_create_stats_options @@ -4667,7 +4667,7 @@ create_logical_replication_stream_stmt: Options: *$11.logicalReplicationOptions(), } } -| CREATE LOGICALLY REPLICATED logical_replication_resources FROM logical_replication_resources ON string_or_placeholder opt_logical_replication_options +| CREATE LOGICALLY REPLICATED logical_replication_resources FROM logical_replication_resources ON string_or_placeholder opt_logical_replication_create_table_options { $$.val = &tree.CreateLogicalReplicationStream{ Into: $4.logicalReplicationResources(), @@ -4730,6 +4730,20 @@ opt_logical_replication_options: $$.val = &tree.LogicalReplicationOptions{} } +opt_logical_replication_create_table_options: + WITH logical_replication_create_table_options_list + { + $$.val = $2.logicalReplicationOptions() + } +| WITH OPTIONS '(' logical_replication_create_table_options_list ')' + { + $$.val = $4.logicalReplicationOptions() + } +| /* EMPTY */ + { + $$.val = &tree.LogicalReplicationOptions{} + } + logical_replication_options_list: // Require at least one option logical_replication_options @@ -4743,6 +4757,19 @@ logical_replication_options_list: } } +logical_replication_create_table_options_list: + // Require at least one option + logical_replication_create_table_options + { + $$.val = $1.logicalReplicationOptions() + } +| logical_replication_create_table_options_list ',' logical_replication_create_table_options + { + if err := $1.logicalReplicationOptions().CombineWith($3.logicalReplicationOptions()); err != nil { + return setErr(sqllex, err) + } + } + // List of valid logical replication options. logical_replication_options: CURSOR '=' string_or_placeholder @@ -4775,6 +4802,20 @@ logical_replication_options: $$.val = &tree.LogicalReplicationOptions{MetricsLabel: $3.expr()} } +logical_replication_create_table_options: + MODE '=' string_or_placeholder + { + $$.val = &tree.LogicalReplicationOptions{Mode: $3.expr()} + } + | DISCARD '=' string_or_placeholder + { + $$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()} + } +| LABEL '=' string_or_placeholder + { + $$.val = &tree.LogicalReplicationOptions{MetricsLabel: $3.expr()} + } + // %Help: CREATE VIRTUAL CLUSTER - create a new virtual cluster // %Category: Experimental // %Text: diff --git a/pkg/sql/parser/testdata/create_logical_replication b/pkg/sql/parser/testdata/create_logical_replication index 34c5d1edbce5..9522a3661192 100644 --- a/pkg/sql/parser/testdata/create_logical_replication +++ b/pkg/sql/parser/testdata/create_logical_replication @@ -22,6 +22,23 @@ CREATE LOGICALLY REPLICATED TABLE (foo) FROM TABLE (foo) ON ('uri') -- fully par CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON '_' -- literals removed CREATE LOGICALLY REPLICATED TABLE _ FROM TABLE _ ON 'uri' -- identifiers removed +error +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH CURSOR = '1536242855577149065.0000000000'; +---- +at or near "cursor": syntax error +DETAIL: source SQL: +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH CURSOR = '1536242855577149065.0000000000' + ^ +HINT: try \h CREATE + +parse +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH MODE = 'immediate'; +---- +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH OPTIONS (MODE = 'immediate') -- normalized! +CREATE LOGICALLY REPLICATED TABLE (foo) FROM TABLE (foo) ON ('uri') WITH OPTIONS (MODE = ('immediate')) -- fully parenthesized +CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON '_' WITH OPTIONS (MODE = '_') -- literals removed +CREATE LOGICALLY REPLICATED TABLE _ FROM TABLE _ ON 'uri' WITH OPTIONS (MODE = 'immediate') -- identifiers removed + parse CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar; ----