From dd7b4ce35165cc6b355298fc3ae8cc58e8679717 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Wed, 22 Jan 2025 14:10:14 -0800 Subject: [PATCH] fix: move aside TSM file on errBlockRead (#25899) The error type check for errBlockRead was incorrect, and bad TSM files were not being moved aside when that error was encountered. Use errors.Join, errors.Is, and errors.As to correctly unwrap multiple errors. Closes https://github.com/influxdata/influxdb/issues/25838 (cherry picked from commit 800970490a715d2b6eb5a0a7fed7fb4e9707af5d) Closes https://github.com/influxdata/influxdb/issues/25840 --- tsdb/engine/tsm1/compact.go | 35 ++++++++++++---- tsdb/engine/tsm1/compact_test.go | 22 +++++----- tsdb/engine/tsm1/engine.go | 25 +++++++----- tsdb/engine/tsm1/reader_test.go | 70 +++++++++++++++++--------------- 4 files changed, 91 insertions(+), 61 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 486e46e6b62..bda31063666 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -86,6 +86,15 @@ type errBlockRead struct { err error } +func (e errBlockRead) Unwrap() error { + return e.err +} + +func (e errBlockRead) Is(target error) bool { + _, ok := target.(errBlockRead) + return ok +} + func (e errBlockRead) Error() string { if e.err != nil { return fmt.Sprintf("block read error on %s: %s", e.file, e.err) @@ -1348,6 +1357,9 @@ type tsmBatchKeyIterator struct { // errs is any error we received while iterating values. errs TSMErrors + // errSet is the error strings we have seen before + errSet map[string]struct{} + // indicates whether the iterator should choose a faster merging strategy over a more // optimally compressed one. If fast is true, multiple blocks will just be added as is // and not combined. In some cases, a slower path will need to be utilized even when @@ -1393,13 +1405,18 @@ type tsmBatchKeyIterator struct { overflowErrors int } +// AppendError - store unique errors in the order of first appearance, +// up to a limit of maxErrors. If the error is unique and stored, return true. func (t *tsmBatchKeyIterator) AppendError(err error) bool { - if t.maxErrors > len(t.errs) { + s := err.Error() + if _, ok := t.errSet[s]; ok { + return true + } else if t.maxErrors > len(t.errs) { t.errs = append(t.errs, err) - // Was the error stored? + t.errSet[s] = struct{}{} return true } else { - // Was the error dropped + // Was the error dropped? t.overflowErrors++ return false } @@ -1417,6 +1434,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan s readers: readers, values: map[string][]Value{}, pos: make([]int, len(readers)), + errSet: map[string]struct{}{}, size: size, iterators: iter, fast: fast, @@ -1616,11 +1634,11 @@ func (k *tsmBatchKeyIterator) merge() { } func (k *tsmBatchKeyIterator) handleEncodeError(err error, typ string) { - k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %v", typ, k.key, err)}) + k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %w", typ, k.key, err)}) } func (k *tsmBatchKeyIterator) handleDecodeError(err error, typ string) { - k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", typ, k.key, err)}) + k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %w", typ, k.key, err)}) } func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) { @@ -1647,6 +1665,8 @@ func (k *tsmBatchKeyIterator) Close() error { for _, r := range k.readers { errSlice = append(errSlice, r.Close()) } + clear(k.errSet) + k.errs = nil return errors.Join(errSlice...) } @@ -1656,11 +1676,10 @@ func (k *tsmBatchKeyIterator) Err() error { return nil } // Copy the errors before appending the dropped error count - var errs TSMErrors - errs = make([]error, 0, len(k.errs)+1) + errs := make([]error, 0, len(k.errs)+1) errs = append(errs, k.errs...) errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors)) - return errs + return errors.Join(errs...) } type cacheKeyIterator struct { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index b0fe7908207..aa4bb478af7 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -8,13 +8,13 @@ import ( "os" "path/filepath" "sort" - "strings" "testing" "time" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -294,19 +294,19 @@ func TestCompactor_DecodeError(t *testing.T) { compactor.FileStore = ffs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) - if err == nil { - t.Fatalf("expected error writing snapshot: %v", err) - } - if len(files) > 0 { - t.Fatalf("no files should be compacted: got %v", len(files)) - - } + require.Error(t, err, "expected error writing snapshot") + require.Zero(t, len(files), "no files should be compacted") compactor.Open() - if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") { - t.Fatalf("expected error writing snapshot: %v", err) - } + _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) + + require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") + tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error { + require.Equal(t, 1, len(strings)) + require.Equal(t, strings[0], f3) + return nil + }) } // Ensures that a compaction will properly merge multiple TSM files diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 1ea98446055..8bf53b4243a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2236,16 +2236,7 @@ func (s *compactionStrategy) compactGroup() { log.Warn("Error compacting TSM files", zap.Error(err)) - // We hit a bad TSM file - rename so the next compaction can proceed. - if _, ok := err.(errBlockRead); ok { - path := err.(errBlockRead).file - log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err)) - if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil { - log.Info("Error removing bad TSM file", zap.Error(err)) - } else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil { - log.Info("Error renaming corrupt TSM file", zap.Error((err))) - } - } + MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback) s.errorStat.Inc() time.Sleep(time.Second) @@ -2273,6 +2264,20 @@ func (s *compactionStrategy) compactGroup() { zap.Int("tsm1_files_n", len(files))) } +func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) { + var blockReadErr errBlockRead + // We hit a bad TSM file - rename so the next compaction can proceed. + if ok := errors.As(err, &blockReadErr); ok { + path := blockReadErr.file + log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err)) + if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil { + log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err)) + } else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil { + log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err)) + } + } +} + // levelCompactionStrategy returns a compactionStrategy for the given level. // It returns nil if there are no TSM files to compact. func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy { diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index 2c61e116f5d..dfaf99ba98c 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -1,12 +1,12 @@ package tsm1 import ( + "errors" "fmt" "math" "os" "path/filepath" "sort" - "strings" "testing" "github.com/stretchr/testify/require" @@ -1841,8 +1841,22 @@ func TestTSMReader_References(t *testing.T) { } func TestBatchKeyIterator_Errors(t *testing.T) { - const MaxErrors = 10 - + const testFile = "testFile.tsm" + errorCases := []error{ + fmt.Errorf("test error 0"), + errBlockRead{ + file: testFile, + err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", + "string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))}, + fmt.Errorf("test error 2"), + // Duplicate error - should be stored once, but not counted as dropped. + fmt.Errorf("test error 2"), + fmt.Errorf("test error 3"), + fmt.Errorf("test error 4"), + } + + // Store all but the last error + MaxErrors := len(errorCases) - 2 dir, name := createTestTSM(t) defer os.RemoveAll(dir) fr, err := os.Open(name) @@ -1853,41 +1867,33 @@ func TestBatchKeyIterator_Errors(t *testing.T) { if err != nil { // Only have a deferred close if we could not create the TSMReader defer func() { - if e := fr.Close(); e != nil { - t.Fatalf("unexpected error closing %s: %v", name, e) - } + require.NoError(t, fr.Close(), "unexpected error closing %s", name) + }() + } else { + defer func() { + require.NoError(t, r.Close(), "unexpected error closing TSMReader for %s", name) }() - - t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err) } - defer func() { - if e := r.Close(); e != nil { - t.Fatalf("error closing TSMReader for %s: %v", name, e) - } - }() + require.NoError(t, err, "unexpected error creating TSMReader for %s", name) interrupts := make(chan struct{}) var iter KeyIterator - if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil { - t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err) - } - var i int - for i = 0; i < MaxErrors*2; i++ { - saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i)) - if i < MaxErrors && !saved { - t.Fatalf("error unexpectedly not saved: %d", i) + iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r) + require.NoError(t, err, "unexpected error creating tsmBatchKeyIterator") + + for i := 0; i < 2; i++ { + for _, e := range errorCases { + saved := iter.(*tsmBatchKeyIterator).AppendError(e) + savedErrs := iter.(*tsmBatchKeyIterator).Err().(interface{ Unwrap() []error }).Unwrap() + require.False(t, len(savedErrs) < MaxErrors && !saved, "error not saved when it should have been: %v", e) } - if i >= MaxErrors && saved { - t.Fatalf("error unexpectedly saved: %d", i) - } - } - errs := iter.Err() - if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) { - t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt) - } - expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors) - if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 { - t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error()) } + var blockReadError errBlockRead + iterErr := iter.Err() + joinErr, ok := iterErr.(interface{ Unwrap() []error }) + require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr) + require.Equal(t, 1+MaxErrors, len(joinErr.Unwrap()), "saved wrong number of errors") + require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err) + require.Equal(t, testFile, blockReadError.file, "unexpected file name in error") } func createTestTSM(t *testing.T) (dir string, name string) {