Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
138244: sql/parser: parse CREATE LOGICALLY REPLICATED TABLE options seperately r=dt a=msbutler

The two different CREATE LDR incancatations will require different options, so we should validate which options apply to which incantation at parsing time.

Epic: none

Release note: none

138327: rac2: fix replicaState.desc access race r=stevendanna a=sumeerbhola

The ReplicaID is a constant, so the race is fixed by duplicating the ReplicaID as an immutable field.

Fixes #138033

Epic: CRDB-37515

Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Jan 6, 2025
3 parents bf5b556 + 9638207 + b8afadd commit ef806f0
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
create_logical_replication_stream_stmt ::=
'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_options
'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_create_table_options
20 changes: 8 additions & 12 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ create_external_connection_stmt ::=
'CREATE' 'EXTERNAL' 'CONNECTION' label_spec 'AS' string_or_placeholder

create_logical_replication_stream_stmt ::=
'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_options
'CREATE' 'LOGICALLY' 'REPLICATED' logical_replication_resources 'FROM' logical_replication_resources 'ON' string_or_placeholder opt_logical_replication_create_table_options

create_schedule_stmt ::=
create_schedule_for_changefeed_stmt
Expand Down Expand Up @@ -1883,9 +1883,9 @@ logical_replication_resources ::=
| 'TABLES' '(' logical_replication_resources_list ')'
| 'DATABASE' database_name

opt_logical_replication_options ::=
'WITH' logical_replication_options_list
| 'WITH' 'OPTIONS' '(' logical_replication_options_list ')'
opt_logical_replication_create_table_options ::=
'WITH' logical_replication_create_table_options_list
| 'WITH' 'OPTIONS' '(' logical_replication_create_table_options_list ')'
|

create_schedule_for_changefeed_stmt ::=
Expand Down Expand Up @@ -2691,8 +2691,8 @@ target_elem ::=
logical_replication_resources_list ::=
( db_object_name ) ( ( ',' db_object_name ) )*

logical_replication_options_list ::=
( logical_replication_options ) ( ( ',' logical_replication_options ) )*
logical_replication_create_table_options_list ::=
( logical_replication_create_table_options ) ( ( ',' logical_replication_create_table_options ) )*

schedule_label_spec ::=
label_spec
Expand Down Expand Up @@ -3344,13 +3344,9 @@ bare_col_label ::=
'identifier'
| bare_label_keywords

logical_replication_options ::=
'CURSOR' '=' string_or_placeholder
| 'MODE' '=' string_or_placeholder
| 'DEFAULT' 'FUNCTION' '=' string_or_placeholder
| 'FUNCTION' db_object_name 'FOR' 'TABLE' db_object_name
logical_replication_create_table_options ::=
'MODE' '=' string_or_placeholder
| 'DISCARD' '=' string_or_placeholder
| 'SKIP' 'SCHEMA' 'CHECK'
| 'LABEL' '=' string_or_placeholder

common_table_expr ::=
Expand Down
45 changes: 25 additions & 20 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,17 +1214,17 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra
// In a joint config (numSets == 2), config 0 may not need a replica
// to force-flush, but the replica may also be in config 1 and be
// force-flushing due to that (or vice versa). This complicates
// computeVoterDirectives, so under the assumption that joint configs
// are temporary, we don't bother stopping force flushes in that joint
// configs.
// computeVoterDirectivesRaftMuLocked, so under the assumption that
// joint configs are temporary, we don't bother stopping force flushes
// in that joint configs.
noChangesNeeded = false
}
// Else, common case.
}
if noChangesNeeded {
// Common case.
} else {
rc.computeVoterDirectives(votersContributingToQuorum, quorumCounts)
rc.computeVoterDirectivesRaftMuLocked(votersContributingToQuorum, quorumCounts)
}
}

Expand Down Expand Up @@ -1291,7 +1291,7 @@ type replicaScore struct {
}

// Second-pass decision-making.
func (rc *rangeController) computeVoterDirectives(
func (rc *rangeController) computeVoterDirectivesRaftMuLocked(
votersContributingToQuorum [2]int, quorumCounts [2]int,
) {
var scratchFFScores, scratchCandidateFFScores, scratchDenySendQScores [5]replicaScore
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func (rc *rangeController) computeVoterDirectives(
sendTokens := rs.sendTokenCounter.tokens(admissionpb.ElasticWorkClass)
bucketedSendTokens := (sendTokens / sendPoolBucket) * sendPoolBucket
score := replicaScore{
replicaID: rs.desc.ReplicaID,
replicaID: rs.replicaID,
bucketedTokensSend: bucketedSendTokens,
tokensEval: rs.evalTokenCounter.tokens(admissionpb.ElasticWorkClass),
}
Expand Down Expand Up @@ -1486,7 +1486,7 @@ func (rc *rangeController) HandleSchedulerEventRaftMuLocked(
rc.opts.Scheduler.ScheduleControllerEvent(rc.opts.RangeID)
}
for _, rs := range nextScheduled {
rc.scheduledMu.replicas[rs.desc.ReplicaID] = struct{}{}
rc.scheduledMu.replicas[rs.replicaID] = struct{}{}
}
}()
}
Expand Down Expand Up @@ -1756,7 +1756,7 @@ func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) {
rc.lastSendQueueStatsScratch.Clear()
for _, rs := range rc.replicaMap {
stats := ReplicaSendQueueStats{
ReplicaID: rs.desc.ReplicaID,
ReplicaID: rs.replicaID,
}
if rs.sendStream != nil {
func() {
Expand Down Expand Up @@ -1983,13 +1983,17 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) {
// replicaState holds state for each replica. All methods are called with
// raftMu held, hence it does not have its own mutex.
type replicaState struct {
// ==== Immutable fields ====
parent *rangeController
// stream aggregates across the streams for the same (tenant, store). This
// is the identity that is used to deduct tokens or wait for tokens to be
// positive.
stream kvflowcontrol.Stream
evalTokenCounter, sendTokenCounter *tokenCounter
desc roachpb.ReplicaDescriptor
replicaID roachpb.ReplicaID

// ==== Mutable fields ====
desc roachpb.ReplicaDescriptor

sendStream *replicaSendStream

Expand Down Expand Up @@ -2063,6 +2067,7 @@ func NewReplicaState(
stream: stream,
evalTokenCounter: parent.opts.SSTokenCounter.Eval(stream),
sendTokenCounter: parent.opts.SSTokenCounter.Send(stream),
replicaID: desc.ReplicaID,
desc: desc,
}
// Don't bother creating the replicaSendStream here. We will do this in
Expand Down Expand Up @@ -2415,7 +2420,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked(
indexToSend: rss.mu.sendQueue.indexToSend,
preventSendQNoForceFlush: false,
}
if rs.desc.ReplicaID == rs.parent.leaseholder {
if rs.replicaID == rs.parent.leaseholder {
if vss.noSendQ {
// The first-pass itself decides that we need to send.
vss.hasSendTokens = true
Expand All @@ -2427,7 +2432,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked(
}
return vss
}
if rs.desc.ReplicaID == rs.parent.opts.LocalReplicaID {
if rs.replicaID == rs.parent.opts.LocalReplicaID {
// Leader.
vss.hasSendTokens = true
return vss
Expand Down Expand Up @@ -2574,7 +2579,7 @@ func (rs *replicaState) scheduledRaftMuLocked(
ctx context.Context, mode RaftMsgAppMode, logSnapshot raft.LogSnapshot,
) (scheduleAgain bool, updateWaiterSets bool) {
rs.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if rs.desc.ReplicaID == rs.parent.opts.LocalReplicaID {
if rs.replicaID == rs.parent.opts.LocalReplicaID {
panic("scheduled called on the leader replica")
}
rss := rs.sendStream
Expand Down Expand Up @@ -2639,9 +2644,9 @@ func (rs *replicaState) scheduledRaftMuLocked(
if err == nil {
var sent bool
msg, sent = rss.parent.parent.opts.RaftInterface.SendMsgAppRaftMuLocked(
rss.parent.desc.ReplicaID, slice)
rss.parent.replicaID, slice)
if !sent {
err = errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID)
err = errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.replicaID)
}
}
if err != nil {
Expand Down Expand Up @@ -2742,7 +2747,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
n := len(event.sendingEntries)
if n != 0 {
panic(errors.AssertionFailedf("pull mode must not have sending entries (leader=%t)",
rss.parent.desc.ReplicaID == rss.parent.parent.opts.LocalReplicaID))
rss.parent.replicaID == rss.parent.parent.opts.LocalReplicaID))
}
if directive.forceFlushStopIndex.active() {
// Must have a send-queue, so sendingEntries should stay empty (these
Expand All @@ -2755,7 +2760,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
}
if wasExceedingInflightBytesThreshold &&
!rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() {
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
rss.parent.parent.scheduleReplica(rss.parent.replicaID)
}
}
} else {
Expand Down Expand Up @@ -2894,10 +2899,10 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
return false, err
}
msg, sent := rss.parent.parent.opts.RaftInterface.SendMsgAppRaftMuLocked(
rss.parent.desc.ReplicaID, slice)
rss.parent.replicaID, slice)
if !sent {
return false,
errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID)
errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.replicaID)
}
rss.updateInflightRaftMuAndStreamLocked(slice)
rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, false)
Expand Down Expand Up @@ -2984,7 +2989,7 @@ func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1)
rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex
if !rss.reachedInflightBytesThresholdRaftMuAndStreamLocked() {
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
rss.parent.parent.scheduleReplica(rss.parent.replicaID)
}
rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false)
}
Expand Down Expand Up @@ -3220,7 +3225,7 @@ func (rss *replicaSendStream) Notify(ctx context.Context) {
}
rss.mu.sendQueue.deductedForSchedulerTokens = tokens
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.DeductedForSchedulerBytes.Inc(int64(tokens))
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
rss.parent.parent.scheduleReplica(rss.parent.replicaID)
}

// NB: raftMu may or may not be held. Specifically, when called from Notify,
Expand Down
45 changes: 43 additions & 2 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ func (u *sqlSymUnion) indexType() tree.IndexType {
%type <tree.Statement> create_policy_stmt

%type <tree.LogicalReplicationResources> logical_replication_resources, logical_replication_resources_list
%type <*tree.LogicalReplicationOptions> opt_logical_replication_options logical_replication_options logical_replication_options_list
%type <*tree.LogicalReplicationOptions> opt_logical_replication_options logical_replication_options logical_replication_options_list opt_logical_replication_create_table_options logical_replication_create_table_options logical_replication_create_table_options_list

%type <tree.Statement> create_stats_stmt
%type <*tree.CreateStatsOptions> opt_create_stats_options
Expand Down Expand Up @@ -4667,7 +4667,7 @@ create_logical_replication_stream_stmt:
Options: *$11.logicalReplicationOptions(),
}
}
| CREATE LOGICALLY REPLICATED logical_replication_resources FROM logical_replication_resources ON string_or_placeholder opt_logical_replication_options
| CREATE LOGICALLY REPLICATED logical_replication_resources FROM logical_replication_resources ON string_or_placeholder opt_logical_replication_create_table_options
{
$$.val = &tree.CreateLogicalReplicationStream{
Into: $4.logicalReplicationResources(),
Expand Down Expand Up @@ -4730,6 +4730,20 @@ opt_logical_replication_options:
$$.val = &tree.LogicalReplicationOptions{}
}

opt_logical_replication_create_table_options:
WITH logical_replication_create_table_options_list
{
$$.val = $2.logicalReplicationOptions()
}
| WITH OPTIONS '(' logical_replication_create_table_options_list ')'
{
$$.val = $4.logicalReplicationOptions()
}
| /* EMPTY */
{
$$.val = &tree.LogicalReplicationOptions{}
}

logical_replication_options_list:
// Require at least one option
logical_replication_options
Expand All @@ -4743,6 +4757,19 @@ logical_replication_options_list:
}
}

logical_replication_create_table_options_list:
// Require at least one option
logical_replication_create_table_options
{
$$.val = $1.logicalReplicationOptions()
}
| logical_replication_create_table_options_list ',' logical_replication_create_table_options
{
if err := $1.logicalReplicationOptions().CombineWith($3.logicalReplicationOptions()); err != nil {
return setErr(sqllex, err)
}
}

// List of valid logical replication options.
logical_replication_options:
CURSOR '=' string_or_placeholder
Expand Down Expand Up @@ -4775,6 +4802,20 @@ logical_replication_options:
$$.val = &tree.LogicalReplicationOptions{MetricsLabel: $3.expr()}
}

logical_replication_create_table_options:
MODE '=' string_or_placeholder
{
$$.val = &tree.LogicalReplicationOptions{Mode: $3.expr()}
}
| DISCARD '=' string_or_placeholder
{
$$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()}
}
| LABEL '=' string_or_placeholder
{
$$.val = &tree.LogicalReplicationOptions{MetricsLabel: $3.expr()}
}

// %Help: CREATE VIRTUAL CLUSTER - create a new virtual cluster
// %Category: Experimental
// %Text:
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/parser/testdata/create_logical_replication
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ CREATE LOGICALLY REPLICATED TABLE (foo) FROM TABLE (foo) ON ('uri') -- fully par
CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON '_' -- literals removed
CREATE LOGICALLY REPLICATED TABLE _ FROM TABLE _ ON 'uri' -- identifiers removed

error
CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH CURSOR = '1536242855577149065.0000000000';
----
at or near "cursor": syntax error
DETAIL: source SQL:
CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH CURSOR = '1536242855577149065.0000000000'
^
HINT: try \h CREATE

parse
CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH MODE = 'immediate';
----
CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON 'uri' WITH OPTIONS (MODE = 'immediate') -- normalized!
CREATE LOGICALLY REPLICATED TABLE (foo) FROM TABLE (foo) ON ('uri') WITH OPTIONS (MODE = ('immediate')) -- fully parenthesized
CREATE LOGICALLY REPLICATED TABLE foo FROM TABLE foo ON '_' WITH OPTIONS (MODE = '_') -- literals removed
CREATE LOGICALLY REPLICATED TABLE _ FROM TABLE _ ON 'uri' WITH OPTIONS (MODE = 'immediate') -- identifiers removed

parse
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar;
----
Expand Down

0 comments on commit ef806f0

Please sign in to comment.