Skip to content

Commit

Permalink
Simplify the code changes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 12, 2025
1 parent 3b24612 commit dbbd1ec
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 75 deletions.
90 changes: 30 additions & 60 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,7 @@ func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowN
return s.getWorkflowState(ctx, targetKeyspace, workflowName)
}

func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, *State, error) {
var options workflowOptions
for _, o := range opts {
o.apply(&options)
}
func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowActionOption) (*trafficSwitcher, *State, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName, opts...)
if err != nil {
s.Logger().Errorf("buildTrafficSwitcher failed: %v", err)
Expand All @@ -446,13 +442,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
IsPartialMigration: ts.isPartialMigration,
}

if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace {
if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil {
return nil, nil, err
}
return ts, state, nil
}

if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
// Nothing left to do.
return ts, state, nil
Expand Down Expand Up @@ -482,6 +471,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
if len(ts.Tables()) == 0 {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", targetKeyspace, workflowName)
}
table := ts.Tables()[0]

if ts.IsMultiTenantMigration() {
// Deduce which traffic has been switched by looking at the current keyspace routing rules.
Expand All @@ -507,9 +497,28 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
}
}
} else {
if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
globalRules, err := topotools.GetRoutingRules(ctx, ts.TopoServer())
if err != nil {
return nil, nil, err
}
for _, table := range ts.Tables() {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table)
rr := globalRules[ruleKey]
if len(rr) > 0 && rr[0] != ruleKey {
state.WritesSwitched = true
break
}
}
}
} else {
state.WorkflowType = TypeReshard
Expand Down Expand Up @@ -1239,7 +1248,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete")
defer span.Finish()

opts := []WorkflowOption{}
opts := []WorkflowActionOption{}
if req.IgnoreSourceKeyspace {
opts = append(opts, IgnoreSourceKeyspace())
}
Expand Down Expand Up @@ -2047,11 +2056,8 @@ func (s *Server) deleteTenantData(ctx context.Context, ts *trafficSwitcher, batc
})
}

func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, error) {
var options workflowOptions
for _, o := range opts {
o.apply(&options)
}
func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowActionOption) (*trafficSwitcher, error) {
wopts := processWorkflowActionOptions(opts)
tgtInfo, err := BuildTargets(ctx, s.ts, s.tmc, targetKeyspace, workflowName)
if err != nil {
s.Logger().Infof("Error building targets: %s", err)
Expand Down Expand Up @@ -2118,7 +2124,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
}

if options.ignoreSourceKeyspace {
if wopts.ignoreSourceKeyspace {
continue
}
if _, ok := ts.sources[bls.Shard]; ok {
Expand Down Expand Up @@ -2153,7 +2159,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
}
}
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace {
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && wopts.ignoreSourceKeyspace {
log.Errorf("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s", ts.sourceKeyspace)
return ts, nil
}
Expand Down Expand Up @@ -2262,12 +2268,9 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
return sw.logs(), nil
}

func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowOption) error {
var options workflowOptions
for _, o := range opts {
o.apply(&options)
}
if !options.ignoreSourceKeyspace {
func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowActionOption) error {
wopts := processWorkflowActionOptions(opts)
if !wopts.ignoreSourceKeyspace {
if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -3465,39 +3468,6 @@ func (s *Server) validateShardsHaveVReplicationPermissions(ctx context.Context,
return nil
}

func (s *Server) updateTablesTrafficState(ctx context.Context, state *State, tables []string) error {
// We assume a consistent state, so only choose routing rule for one table.
if len(tables) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", state.TargetKeyspace, state.Workflow)
}

var err error
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_RDONLY)
if err != nil {
return err
}
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_REPLICA)
if err != nil {
return err
}
globalRules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return err
}
for _, table := range tables {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
ruleKey := fmt.Sprintf("%s.%s", state.SourceKeyspace, table)
rr := globalRules[ruleKey]
if len(rr) > 0 && rr[0] != ruleKey {
state.WritesSwitched = true
break
}
}
log.Errorf("DEBUG: state: %+v", state)
return err
}

func (s *Server) Logger() logutil.Logger {
if s.options.logger == nil {
s.options.logger = logutil.NewConsoleLogger() // Use default system logger
Expand Down
30 changes: 15 additions & 15 deletions go/vt/vtctl/workflow/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,37 @@ func WithLogger(l logutil.Logger) ServerOption {
})
}

// workflowOptions configure a workflow's optional behavior when
// workflowActionOptions configure a workflow's optional behavior when
// performing actions in the worfklow server.
// workflowOptions are set by the WorkflowOption values passed
// workflowActionOptions are set by the WorkflowActionOption values passed
// to the server functions.
type workflowOptions struct {
type workflowActionOptions struct {
ignoreSourceKeyspace bool
}

// WorkflowOption alters how we perform the certain workflow operations.
type WorkflowOption interface {
apply(*workflowOptions)
// WorkflowActionOption alters how we perform the certain workflow operations.
type WorkflowActionOption interface {
apply(*workflowActionOptions)
}

// funcWorkflowOption wraps a function that modifies workflowOptions into
// an implementation of the WorkflowOption interface.
type funcWorkflowOption struct {
f func(*workflowOptions)
// funcWorkflowActionOption wraps a function that modifies workflowActionOptions
// into an implementation of the WorkflowActionOption interface.
type funcWorkflowActionOption struct {
f func(*workflowActionOptions)
}

func (fwo *funcWorkflowOption) apply(wo *workflowOptions) {
func (fwo *funcWorkflowActionOption) apply(wo *workflowActionOptions) {
fwo.f(wo)
}

func newFuncWorkflowOption(f func(*workflowOptions)) *funcWorkflowOption {
return &funcWorkflowOption{
func newFuncWorkflowActionOption(f func(*workflowActionOptions)) *funcWorkflowActionOption {
return &funcWorkflowActionOption{
f: f,
}
}

func IgnoreSourceKeyspace() WorkflowOption {
return newFuncWorkflowOption(func(o *workflowOptions) {
func IgnoreSourceKeyspace() WorkflowActionOption {
return newFuncWorkflowActionOption(func(o *workflowActionOptions) {
o.ignoreSourceKeyspace = true
})
}
8 changes: 8 additions & 0 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,11 @@ func getVindexAndVSchema(ctx context.Context, ts *topo.Server, keyspace string,
}
return vindex, vschema, nil
}

func processWorkflowActionOptions(opts []WorkflowActionOption) workflowActionOptions {
var options workflowActionOptions
for _, o := range opts {
o.apply(&options)
}
return options
}

0 comments on commit dbbd1ec

Please sign in to comment.