Skip to content

Commit

Permalink
Merge #138297
Browse files Browse the repository at this point in the history
138297: crosscluster/logical: enable automatic bidi replication r=jeffswenson a=msbutler

This patch enables a user to create a bidirectional replication stream with one
command. When the user runs `CREATE LOGICALLY REPLICATED TABLE tabB FROM tabA
ON uriA  WITH BIDIRECTIONAL ON uriB`, the following will happen:
- on cluster B, during job planning, construct the reverse stream cmd, while we
	still have access to original LDR cmd. This command will have the following
form: `CREATE LOGICAL REPLICATION STREAM FROM TABLE tabB ON 'uriB' INTO TABLE
tabA WITH CURSOR = $1, PARENT = '{og job id}';`
- on cluster B, create table tabB and begin an offline scan, as normal
- on cluster B, once the offline scan completes, but before tabB is published,
  set up the reverse stream from B to A at a cursor time after the initial scan
completes but before the tabB has come online. This cursor time prevents data
looping and ensures the replication of all future foreground writes.
- on cluster B, steady state replication begins.

Epic: none

Release note (sql change): when a user runs CREATE LOGICALLY REPLICATED TABLE,
they must specify one of the following options:
- UNIDIRECTIONAL: setups a unidirectional stream with fast initial scan
- BIDIRECTIONAL ON {dest uri}: sets up a bidirectional stream from the original
  dest to the original source.

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Jan 13, 2025
2 parents 4f52c1d + 15876c2 commit f2d81ee
Show file tree
Hide file tree
Showing 17 changed files with 309 additions and 36 deletions.
6 changes: 6 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ unreserved_keyword ::=
| 'BATCH'
| 'BEFORE'
| 'BEGIN'
| 'BIDIRECTIONAL'
| 'BINARY'
| 'BUCKET_COUNT'
| 'BUNDLE'
Expand Down Expand Up @@ -1513,6 +1514,7 @@ unreserved_keyword ::=
| 'TYPE'
| 'TYPES'
| 'THROTTLING'
| 'UNIDIRECTIONAL'
| 'UNBOUNDED'
| 'UNCOMMITTED'
| 'UNKNOWN'
Expand Down Expand Up @@ -3360,6 +3362,8 @@ logical_replication_create_table_options ::=
'MODE' '=' string_or_placeholder
| 'DISCARD' '=' string_or_placeholder
| 'LABEL' '=' string_or_placeholder
| 'UNIDIRECTIONAL'
| 'BIDIRECTIONAL' 'ON' string_or_placeholder

common_table_expr ::=
table_alias_name opt_col_def_list_no_types 'AS' materialize_clause '(' preparable_stmt ')'
Expand Down Expand Up @@ -3774,6 +3778,7 @@ bare_label_keywords ::=
| 'BEFORE'
| 'BEGIN'
| 'BETWEEN'
| 'BIDIRECTIONAL'
| 'BIGINT'
| 'BINARY'
| 'BIT'
Expand Down Expand Up @@ -4302,6 +4307,7 @@ bare_label_keywords ::=
| 'TYPES'
| 'UNBOUNDED'
| 'UNCOMMITTED'
| 'UNIDIRECTIONAL'
| 'UNIQUE'
| 'UNKNOWN'
| 'UNLISTEN'
Expand Down
1 change: 1 addition & 0 deletions pkg/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
Expand Down
70 changes: 63 additions & 7 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -100,7 +101,7 @@ func createLogicalReplicationStreamPlanHook(
return pgerror.New(pgcode.InvalidParameterValue, "the same number of source and destination tables must be specified")
}

options, err := evalLogicalReplicationOptions(ctx, stmt.Options, exprEval, p)
options, err := evalLogicalReplicationOptions(ctx, stmt.Options, exprEval, p, stmt.CreateTable)
if err != nil {
return err
}
Expand Down Expand Up @@ -135,6 +136,7 @@ func createLogicalReplicationStreamPlanHook(
return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m)
}
}

resolvedDestObjects, err := resolveDestinationObjects(ctx, p, p.SessionData(), stmt.Into, stmt.CreateTable)
if err != nil {
return err
Expand Down Expand Up @@ -188,7 +190,8 @@ func createLogicalReplicationStreamPlanHook(
srcTableNames[i] = tb.String()
}
spec, err := client.CreateForTables(ctx, &streampb.ReplicationProducerRequest{
TableNames: srcTableNames,
TableNames: srcTableNames,
AllowOffline: options.ParentID != 0,
})
if err != nil {
return err
Expand Down Expand Up @@ -253,8 +256,23 @@ func createLogicalReplicationStreamPlanHook(
defaultConflictResolution = *cr
}

jobID := p.ExecCfg().JobRegistry.MakeJobID()
var reverseStreamCmd string
if stmt.CreateTable && options.BidirectionalURI() != "" {
// TODO: validate URI.

reverseStmt := *stmt
reverseStmt.From, reverseStmt.Into = reverseStmt.Into, reverseStmt.From
reverseStmt.CreateTable = false
reverseStmt.Options.BidirectionalURI = nil
reverseStmt.Options.ParentID = tree.NewStrVal(jobID.String())
reverseStmt.PGURL = tree.NewStrVal(options.BidirectionalURI())
reverseStmt.Options.Cursor = &tree.Placeholder{Idx: 0}
reverseStreamCmd = reverseStmt.String()
}

jr := jobs.Record{
JobID: p.ExecCfg().JobRegistry.MakeJobID(),
JobID: jobID,
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", resolvedDestObjects.TargetDescription(), cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
Expand All @@ -269,6 +287,8 @@ func createLogicalReplicationStreamPlanHook(
Mode: mode,
MetricsLabel: options.metricsLabel,
CreateTable: stmt.CreateTable,
ReverseStreamCommand: reverseStreamCmd,
ParentID: int64(options.ParentID),
},
Progress: progress,
}
Expand Down Expand Up @@ -448,9 +468,13 @@ func createLogicalReplicationStreamTypeCheck(
stmt.Options.Mode,
stmt.Options.MetricsLabel,
stmt.Options.Discard,
stmt.Options.BidirectionalURI,
stmt.Options.ParentID,
},
exprutil.Ints{stmt.Options.ParentID},
exprutil.Bools{
stmt.Options.SkipSchemaCheck,
stmt.Options.Unidirectional,
},
}
if err := exprutil.TypeCheck(ctx, "LOGICAL REPLICATION STREAM", p.SemaCtx(),
Expand All @@ -467,17 +491,20 @@ type resolvedLogicalReplicationOptions struct {
mode string
defaultFunction *jobspb.LogicalReplicationDetails_DefaultConflictResolution
// Mapping of table name to function descriptor
userFunctions map[string]int32
discard string
skipSchemaCheck bool
metricsLabel string
userFunctions map[string]int32
discard string
skipSchemaCheck bool
metricsLabel string
bidirectionalURI string
ParentID catpb.JobID
}

func evalLogicalReplicationOptions(
ctx context.Context,
options tree.LogicalReplicationOptions,
eval exprutil.Evaluator,
p sql.PlanHookState,
createTable bool,
) (*resolvedLogicalReplicationOptions, error) {
r := &resolvedLogicalReplicationOptions{}
if options.Mode != nil {
Expand Down Expand Up @@ -562,6 +589,28 @@ func evalLogicalReplicationOptions(
if options.SkipSchemaCheck == tree.DBoolTrue {
r.skipSchemaCheck = true
}
if options.ParentID != nil {
parentID, err := eval.Int(ctx, options.ParentID)
if err != nil {
return nil, err
}
r.ParentID = catpb.JobID(parentID)
}
unidirectional := options.Unidirectional == tree.DBoolTrue

if options.BidirectionalURI != nil {
uri, err := eval.String(ctx, options.BidirectionalURI)
if err != nil {
return nil, err
}
r.bidirectionalURI = uri
}
if createTable && unidirectional && r.bidirectionalURI != "" {
return nil, errors.New("UNIDIRECTIONAL and BIDIRECTIONAL cannot be specified together")
}
if createTable && !unidirectional && r.bidirectionalURI == "" {
return nil, errors.New("either BIDIRECTIONAL or UNIDRECTIONAL must be specified")
}
return r, nil
}

Expand Down Expand Up @@ -627,3 +676,10 @@ func (r *resolvedLogicalReplicationOptions) SkipSchemaCheck() bool {
}
return r.skipSchemaCheck
}

func (r *resolvedLogicalReplicationOptions) BidirectionalURI() string {
if r == nil || r.bidirectionalURI == "" {
return ""
}
return r.bidirectionalURI
}
62 changes: 54 additions & 8 deletions pkg/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ func (r *logicalReplicationResumer) ingest(
replicatedTimeAtStart = progress.ReplicatedTime
)

if err := r.maybePublishCreatedTables(ctx, jobExecCtx, progress, payload); err != nil {
return err
}

uris, err := r.getClusterUris(ctx, r.job, execCfg.InternalDB)
if err != nil {
return err
Expand All @@ -170,6 +166,14 @@ func (r *logicalReplicationResumer) ingest(
}
defer func() { _ = client.Close(ctx) }()

if err := r.maybeStartReverseStream(ctx, jobExecCtx, client); err != nil {
return err
}

if err := r.maybePublishCreatedTables(ctx, jobExecCtx); err != nil {
return err
}

asOf := replicatedTimeAtStart
if asOf.IsEmpty() {
asOf = payload.ReplicationStartTime
Expand All @@ -194,6 +198,8 @@ func (r *logicalReplicationResumer) ingest(
}); err != nil {
return err
}
// Update the local progress copy as it was just updated.
progress = r.job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication

// TODO(azhu): add a flag to avoid recreating dlq tables during replanning
if !(payload.CreateTable && progress.ReplicatedTime.IsEmpty()) {
Expand Down Expand Up @@ -295,15 +301,55 @@ func (r *logicalReplicationResumer) ingest(
return err
}

func (r *logicalReplicationResumer) maybeStartReverseStream(
ctx context.Context, jobExecCtx sql.JobExecContext, client streamclient.Client,
) error {

// Instantiate a local copy of progress and details as they are gated behind a mutex.
progress := r.job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
details := r.job.Details().(jobspb.LogicalReplicationDetails)

if !(details.ReverseStreamCommand != "" && progress.ReplicatedTime.IsSet() && !progress.StartedReverseStream) {
return nil
}

// Begin the reverse stream at a system time before source tables have been
// published but after they have been created during this job's planning.
now := jobExecCtx.ExecCfg().Clock.Now()
if err := client.ExecStatement(ctx, details.ReverseStreamCommand, "start-reverse-stream", now.AsOfSystemTime()); err != nil {
return errors.Wrapf(err, "failed to start reverse stream")
}

// TODO(msbutler): if the job exits before we write here but after setting up
// the reverse stream, we will accidentally create a second reverse stream. To
// prevent this, the PARENT option will passed to the reverse stream, then
// during ldr stream creation, the planhook checks if any job is already
// running with this parent job id.
if err := r.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.StartedReverseStream = true
ju.UpdateProgress(md.Progress)
return nil
}); err != nil {
return err
}
log.Infof(ctx, "started reverse stream")
return nil
}

func (r *logicalReplicationResumer) maybePublishCreatedTables(
ctx context.Context,
jobExecCtx sql.JobExecContext,
progress *jobspb.LogicalReplicationProgress,
details jobspb.LogicalReplicationDetails,
ctx context.Context, jobExecCtx sql.JobExecContext,
) error {

// Instantiate a local copy of progress and details as they are gated behind a mutex.
progress := r.job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
details := r.job.Details().(jobspb.LogicalReplicationDetails)

if !(details.CreateTable && progress.ReplicatedTime.IsSet() && !progress.PublishedNewTables) {
return nil
}
if details.ReverseStreamCommand != "" && !progress.StartedReverseStream {
return errors.AssertionFailedf("attempting to publish descriptors before starting reverse stream")
}
return sql.DescsTxn(ctx, jobExecCtx.ExecCfg(), func(ctx context.Context, txn isql.Txn, descCol *descs.Collection) error {
b := txn.KV().NewBatch()
for i := range details.IngestedExternalCatalog.Tables {
Expand Down
43 changes: 40 additions & 3 deletions pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func TestLogicalStreamIngestionJobWithCursor(t *testing.T) {
func TestCreateTables(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderDeadlock(t)
skip.UnderRace(t)
defer log.Scope(t).Close(t)

ctx := context.Background()
Expand All @@ -403,7 +404,7 @@ func TestCreateTables(t *testing.T) {
sqlB := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("b")))

var jobID jobspb.JobID
sqlB.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE b.tab FROM TABLE tab ON $1", aURL.String()).Scan(&jobID)
sqlB.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE b.tab FROM TABLE tab ON $1 WITH UNIDIRECTIONAL", aURL.String()).Scan(&jobID)

// Check LWW on initial scan data.
sqlA.Exec(t, "UPSERT INTO tab VALUES (1, 'howdy')")
Expand All @@ -416,14 +417,18 @@ func TestCreateTables(t *testing.T) {
// Ensure secondary index was replicated as well.
compareReplicatedTables(t, srv, "a", "b", "tab", sqlA, sqlB)
})

t.Run("pause initial scan", func(t *testing.T) {
sqlA.Exec(t, "CREATE DATABASE c")
sqlA.Exec(t, "CREATE TABLE tab2 (pk int primary key, payload string)")
sqlc := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("c")))
sqlc.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'logical_replication.after.retryable_error'")
defer func() {
sqlc.Exec(t, "RESET CLUSTER SETTING jobs.debug.pausepoints")
}()

var jobID jobspb.JobID
sqlc.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE tab2 FROM TABLE tab2 ON $1", aURL.String()).Scan(&jobID)
sqlc.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE tab2 FROM TABLE tab2 ON $1 WITH UNIDIRECTIONAL", aURL.String()).Scan(&jobID)
jobutils.WaitForJobToPause(t, sqlc, jobID)

// Verify created tables are not visible as we paused before publishing
Expand All @@ -450,13 +455,45 @@ func TestCreateTables(t *testing.T) {
// rerun LDR again. As you can see in the
// restore-on-fail-or-cancel-fast-drop test, setting this up is a pain, so I
// will address this in an upcoming pr.
sqlc.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE tab FROM TABLE tab ON $1", aURL.String()).Scan(&jobID)
sqlc.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE tab FROM TABLE tab ON $1 WITH UNIDIRECTIONAL", aURL.String()).Scan(&jobID)
jobutils.WaitForJobToPause(t, sqlc, jobID)

// Next, resume it and wait for the table and its dlq table to come online.
sqlc.Exec(t, "RESUME JOB $1", jobID)
sqlc.CheckQueryResultsRetry(t, "SELECT count(*) FROM [SHOW TABLES]", [][]string{{"2"}})
})
t.Run("bidi", func(t *testing.T) {
sqlA.Exec(t, "CREATE TABLE tab3 (pk int primary key, payload string)")
sqlA.Exec(t, "INSERT INTO tab3 VALUES (1, 'hello')")

sqlA.Exec(t, "CREATE DATABASE d")
sqlD := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("d")))
dURL, cleanup := srv.PGUrl(t, serverutils.DBName("d"))
defer cleanup()

var jobID jobspb.JobID
sqlD.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE tab3 FROM TABLE tab3 ON $1 WITH BIDIRECTIONAL ON $2", aURL.String(), dURL.String()).Scan(&jobID)
WaitUntilReplicatedTime(t, srv.Clock().Now(), sqlD, jobID)
sqlD.CheckQueryResultsRetry(t, `SELECT count(*) FROM [SHOW LOGICAL REPLICATION JOBS] WHERE tables = '{a.public.tab3}'`, [][]string{{"1"}})
var reverseJobID jobspb.JobID
sqlD.QueryRow(t, "SELECT job_id FROM [SHOW LOGICAL REPLICATION JOBS] WHERE tables = '{a.public.tab3}'").Scan(&reverseJobID)
sqlD.Exec(t, "INSERT INTO tab3 VALUES (2, 'goodbye')")
sqlA.Exec(t, "INSERT INTO tab3 VALUES (3, 'brb')")
WaitUntilReplicatedTime(t, srv.Clock().Now(), sqlD, jobID)
WaitUntilReplicatedTime(t, srv.Clock().Now(), sqlD, reverseJobID)
compareReplicatedTables(t, srv, "a", "d", "tab3", sqlA, sqlD)
})
t.Run("create command errors", func(t *testing.T) {
sqlA.Exec(t, "CREATE TABLE tab4 (pk int primary key, payload string)")
sqlA.Exec(t, "INSERT INTO tab4 VALUES (1, 'hello')")

sqlA.Exec(t, "CREATE DATABASE e")
sqlE := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("e")))
eURL, cleanup := srv.PGUrl(t, serverutils.DBName("e"))
defer cleanup()
sqlE.ExpectErr(t, "either BIDIRECTIONAL or UNIDRECTIONAL must be specified", "CREATE LOGICALLY REPLICATED TABLE b.tab4 FROM TABLE tab4 ON $1", eURL.String())
sqlE.ExpectErr(t, "UNIDIRECTIONAL and BIDIRECTIONAL cannot be specified together", "CREATE LOGICALLY REPLICATED TABLE tab4 FROM TABLE tab4 ON $1 WITH BIDIRECTIONAL ON $2, UNIDIRECTIONAL", aURL.String(), eURL.String())
})
}

// TestLogicalStreamIngestionAdvancePTS tests that the producer side pts advances
Expand Down
2 changes: 1 addition & 1 deletion pkg/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables(
mutableTableDescs := make([]*tabledesc.Mutable, 0, len(req.TableNames))
tableIDs := make([]uint32, 0, len(req.TableNames))

externalCatalog, err := externalcatalog.ExtractExternalCatalog(ctx, r.resolver, r.txn, r.txn.Descriptors(), false /* includeOffline */, req.TableNames...)
externalCatalog, err := externalcatalog.ExtractExternalCatalog(ctx, r.resolver, r.txn, r.txn.Descriptors(), req.AllowOffline, req.TableNames...)
if err != nil {
return streampb.ReplicationProducerSpec{}, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/crosscluster/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type Client interface {

PlanLogicalReplication(ctx context.Context, req streampb.LogicalReplicationPlanRequest) (LogicalReplicationPlan, error)
CreateForTables(ctx context.Context, req *streampb.ReplicationProducerRequest) (*streampb.ReplicationProducerSpec, error)
ExecStatement(ctx context.Context, cmd string, opname string, args ...interface{}) error

// Close releases all the resources used by this client.
Close(ctx context.Context) error
Expand Down
Loading

0 comments on commit f2d81ee

Please sign in to comment.