Skip to content

Commit

Permalink
Multi-tenant workflow SwitchWrites: Don't add denied tables on cancel…
Browse files Browse the repository at this point in the history
…Migration() (#17782)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Feb 14, 2025
1 parent 420342f commit 70114ad
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go/vt/topotools/mirror_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func GetMirrorRules(ctx context.Context, ts *topo.Server) (map[string]map[string
// SaveMirrorRules converts a mapping of fromTable=>[]toTables into a
// vschemapb.MirrorRules protobuf message and saves it in the topology.
func SaveMirrorRules(ctx context.Context, ts *topo.Server, rules map[string]map[string]float32) error {
log.Infof("Saving mirror rules %v\n", rules)
log.V(2).Infof("Saving mirror rules %v\n", rules)

rrs := &vschemapb.MirrorRules{Rules: make([]*vschemapb.MirrorRule, 0)}
for fromTable, mrs := range rules {
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2977,7 +2977,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
time.Sleep(lockTablesCycleDelay)
}
}

// Get the source positions now that writes are stopped, the streams were stopped (e.g.
// intra-keyspace materializations that write on the source), and we know for certain
// that any in progress writes are done.
Expand Down
25 changes: 18 additions & 7 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,9 +1154,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err())
}

ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1168,20 +1168,27 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(cmCtx, true /* revert */)
if !ts.IsMultiTenantMigration() {
ts.Logger().Infof("cancelMigration (%v): adding denied tables to target", ts.WorkflowName())
err = ts.switchDeniedTables(cmCtx, true /* revert */)
} else {
ts.Logger().Infof("cancelMigration (%v): multi-tenant, not adding denied tables to target", ts.WorkflowName())
}
} else {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err)
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName())
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
Expand All @@ -1190,17 +1197,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)
}

if cancelErrs.HasErrors() {
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
}
blpStats.WorkflowConfig = workflowConfig.String()
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

id, err := strconv.ParseInt(params["id"], 10, 32)
if err != nil {
return nil, err
}
ct.id = int32(id)
ct.workflow = params["workflow"]
log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr)

ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), workflowConfig.MaxTimeToRetryError)

state := params["state"]
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v",
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vr.id, vr.WorkflowName, settings.StartPos, settings.StopPos)
log.V(2).Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v",
vr.id, settings.StartPos, settings.StopPos, vr.source.Filter)
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
Expand Down Expand Up @@ -266,7 +267,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error {
// one. This allows for the apply thread to catch up more quickly if
// a backlog builds up.
func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down

0 comments on commit 70114ad

Please sign in to comment.