Skip to content

Commit

Permalink
VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (
Browse files Browse the repository at this point in the history
#17772)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Feb 14, 2025
1 parent efa30d4 commit 420342f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/fk_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func TestFKExt(t *testing.T) {
setSidecarDBName("_vt")

// Ensure that there are multiple copy phase cycles per table.
extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal")
extraVTTabletArgs = append(extraVTTabletArgs,
"--vstream_packet_size=256",
"--queryserver-config-schema-change-signal",
parallelInsertWorkers)
extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4")
defer func() { extraVTTabletArgs = nil }()
initFKExtConfig(t)
Expand Down
14 changes: 9 additions & 5 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const testWorkflowFlavor = workflowFlavorVtctld
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
func TestFKWorkflow(t *testing.T) {
setSidecarDBName("_vt")
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
Expand Down Expand Up @@ -128,11 +129,14 @@ func TestFKWorkflow(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()

t11Count := getRowCount(t, vtgateConn, "t11")
t12Count := getRowCount(t, vtgateConn, "t12")
require.Greater(t, t11Count, 1)
require.Greater(t, t12Count, 1)
require.Equal(t, t11Count, t12Count)
if withLoad {
t11Count := getRowCount(t, vtgateConn, "t11")
t12Count := getRowCount(t, vtgateConn, "t12")
require.Greater(t, t11Count, 1)
require.Greater(t, t12Count, 1)
require.Equal(t, t11Count, t12Count)
}

}

func insertInitialFKData(t *testing.T) {
Expand Down
17 changes: 8 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
defer rowsCopiedTicker.Stop()

parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers)))
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}

copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand All @@ -115,7 +111,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk)
tableName := resp.TableName
gtid = resp.Gtid

updateRowsCopied := func() error {
updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get())
_, err := vc.vr.dbClient.Execute(updateRowsQuery)
Expand Down Expand Up @@ -205,6 +200,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("copying table %s with lastpk %v", tableName, lastpkbv)
// Prepare a vcopierCopyTask for the current batch of work.
currCh := make(chan *vcopierCopyTaskResult, 1)

if parallelism > 1 {
resp = resp.CloneVT()
}
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk))

// Send result to the global resultCh and currCh. resultCh is used by
Expand Down Expand Up @@ -292,12 +291,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("Copy of %v stopped", state.currentTableName)
return fmt.Errorf("CopyAll was interrupted due to context expiration")
default:
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if copyWorkQueue != nil {
copyWorkQueue.close()
}
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if err := vc.updatePos(ctx, gtid); err != nil {
return err
}
Expand Down

0 comments on commit 420342f

Please sign in to comment.