Skip to content

Commit

Permalink
Update unit tests and retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 13, 2025
1 parent 4ecba24 commit 47ef9af
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
44 changes: 37 additions & 7 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ const tabletPickerContextTimeout = 90 * time.Second
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

const partialJournalingParticipantsMsg = "not all journaling participants are in the stream"

type partialJournalingParticipantsError struct {
msg string
details string
}

func (p *partialJournalingParticipantsError) Error() string {
return fmt.Sprintf("%s: %s", p.msg, p.details)
}

func NewPartialJournalingParticipantsError(details string) error {
return &partialJournalingParticipantsError{
msg: partialJournalingParticipantsMsg,
details: details,
}
}

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -784,15 +802,27 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
// Not having all journaling participants available is considered fatal and
// requires a new VStream.
if _, ok := vterrors.UnwrapAll(err).(*partialJournalingParticipantsError); ok {
return false, false
}

errCode := vterrors.Code(err)

// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
// This typically indicates that the user provided invalid arguments for the
// VStream so we should not retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT {
// But if there is a GTIDSet Mismatch on the tablet, omit that tablet from
// the candidate list in the TabletPicker and retry. The argument was invalid
// *for that specific *tablet* but it's not generally invalid.
if strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}
return false, false
}

// If this is a recoverable/ephemeral error, then retry.
// For anything else, if this is a recoverable/ephemeral error then retry.
if !vreplication.IsUnrecoverableError(err) {
return true, false
}
Expand Down Expand Up @@ -938,7 +968,7 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
mode = matchAll
je.participants[inner] = false
case matchNone:
return nil, fmt.Errorf("not all journaling participants are in the stream: journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids)
return nil, NewPartialJournalingParticipantsError(fmt.Sprintf("journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids))
}
continue nextParticipant
}
Expand All @@ -947,7 +977,7 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
case undecided, matchNone:
mode = matchNone
case matchAll:
return nil, fmt.Errorf("not all journaling participants are in the stream: journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids)
return nil, NewPartialJournalingParticipantsError(fmt.Sprintf("journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids))
}
}
if mode == matchNone {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestVStreamRetriableErrors(t *testing.T) {
name: "failed precondition",
code: vtrpcpb.Code_FAILED_PRECONDITION,
msg: "",
shouldRetry: true,
shouldRetry: false,
ignoreTablet: false,
},
{
Expand All @@ -420,7 +420,7 @@ func TestVStreamRetriableErrors(t *testing.T) {
ignoreTablet: false,
},
{
name: "should not retry",
name: "invalid argument",
code: vtrpcpb.Code_INVALID_ARGUMENT,
msg: "final error",
shouldRetry: false,
Expand Down Expand Up @@ -928,7 +928,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Variable names are maintained like in OneToMany, but order is different.1
// Variable names are maintained like in OneToMany, but order is different.
ks := "TestVStream"
cell := "aa"
_ = createSandbox(ks)
Expand Down

0 comments on commit 47ef9af

Please sign in to comment.