Skip to content

Commit

Permalink
fix: move aside TSM file on errBlockRead (#25839)
Browse files Browse the repository at this point in the history
The error type check for errBlockRead was incorrect,
and bad TSM files were not being moved aside when
that error was encountered. Use errors.Join,
errors.Is, and errors.As to correctly unwrap multiple
errors.

Closes #25838

(cherry picked from commit 8009704)

Closes #25840
  • Loading branch information
davidby-influx committed Jan 22, 2025
1 parent c82d4f8 commit c970b22
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 61 deletions.
35 changes: 27 additions & 8 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ type errBlockRead struct {
err error
}

func (e errBlockRead) Unwrap() error {
return e.err
}

func (e errBlockRead) Is(target error) bool {
_, ok := target.(errBlockRead)
return ok
}

func (e errBlockRead) Error() string {
if e.err != nil {
return fmt.Sprintf("block read error on %s: %s", e.file, e.err)
Expand Down Expand Up @@ -1348,6 +1357,9 @@ type tsmBatchKeyIterator struct {
// errs is any error we received while iterating values.
errs TSMErrors

// errSet is the error strings we have seen before
errSet map[string]struct{}

// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
Expand Down Expand Up @@ -1393,13 +1405,18 @@ type tsmBatchKeyIterator struct {
overflowErrors int
}

// AppendError - store unique errors in the order of first appearance,
// up to a limit of maxErrors. If the error is unique and stored, return true.
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
if t.maxErrors > len(t.errs) {
s := err.Error()
if _, ok := t.errSet[s]; ok {
return true
} else if t.maxErrors > len(t.errs) {
t.errs = append(t.errs, err)
// Was the error stored?
t.errSet[s] = struct{}{}
return true
} else {
// Was the error dropped
// Was the error dropped?
t.overflowErrors++
return false
}
Expand All @@ -1417,6 +1434,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan s
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
errSet: map[string]struct{}{},
size: size,
iterators: iter,
fast: fast,
Expand Down Expand Up @@ -1616,11 +1634,11 @@ func (k *tsmBatchKeyIterator) merge() {
}

func (k *tsmBatchKeyIterator) handleEncodeError(err error, typ string) {
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %v", typ, k.key, err)})
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %w", typ, k.key, err)})
}

func (k *tsmBatchKeyIterator) handleDecodeError(err error, typ string) {
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", typ, k.key, err)})
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %w", typ, k.key, err)})
}

func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
Expand All @@ -1647,6 +1665,8 @@ func (k *tsmBatchKeyIterator) Close() error {
for _, r := range k.readers {
errSlice = append(errSlice, r.Close())
}
clear(k.errSet)
k.errs = nil
return errors.Join(errSlice...)
}

Expand All @@ -1656,11 +1676,10 @@ func (k *tsmBatchKeyIterator) Err() error {
return nil
}
// Copy the errors before appending the dropped error count
var errs TSMErrors
errs = make([]error, 0, len(k.errs)+1)
errs := make([]error, 0, len(k.errs)+1)
errs = append(errs, k.errs...)
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
return errs
return errors.Join(errs...)
}

type cacheKeyIterator struct {
Expand Down
21 changes: 10 additions & 11 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -294,19 +293,19 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.FileStore = ffs

files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))

}
require.Error(t, err, "expected error writing snapshot")
require.Zero(t, len(files), "no files should be compacted")

compactor.Open()

if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") {
t.Fatalf("expected error writing snapshot: %v", err)
}
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())

require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error {
require.Equal(t, 1, len(strings))
require.Equal(t, strings[0], f3)
return nil
})
}

// Ensures that a compaction will properly merge multiple TSM files
Expand Down
25 changes: 15 additions & 10 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2236,16 +2236,7 @@ func (s *compactionStrategy) compactGroup() {

log.Warn("Error compacting TSM files", zap.Error(err))

// We hit a bad TSM file - rename so the next compaction can proceed.
if _, ok := err.(errBlockRead); ok {
path := err.(errBlockRead).file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err))
if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.Error((err)))
}
}
MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback)

s.errorStat.Inc()
time.Sleep(time.Second)
Expand Down Expand Up @@ -2273,6 +2264,20 @@ func (s *compactionStrategy) compactGroup() {
zap.Int("tsm1_files_n", len(files)))
}

func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) {
var blockReadErr errBlockRead
// We hit a bad TSM file - rename so the next compaction can proceed.
if ok := errors.As(err, &blockReadErr); ok {
path := blockReadErr.file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
}
}
}

// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
Expand Down
70 changes: 38 additions & 32 deletions tsdb/engine/tsm1/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package tsm1

import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1841,8 +1841,22 @@ func TestTSMReader_References(t *testing.T) {
}

func TestBatchKeyIterator_Errors(t *testing.T) {
const MaxErrors = 10

const testFile = "testFile.tsm"
errorCases := []error{
fmt.Errorf("test error 0"),
errBlockRead{
file: testFile,
err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v",
"string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))},
fmt.Errorf("test error 2"),
// Duplicate error - should be stored once, but not counted as dropped.
fmt.Errorf("test error 2"),
fmt.Errorf("test error 3"),
fmt.Errorf("test error 4"),
}

// Store all but the last error
MaxErrors := len(errorCases) - 2
dir, name := createTestTSM(t)
defer os.RemoveAll(dir)
fr, err := os.Open(name)
Expand All @@ -1853,41 +1867,33 @@ func TestBatchKeyIterator_Errors(t *testing.T) {
if err != nil {
// Only have a deferred close if we could not create the TSMReader
defer func() {
if e := fr.Close(); e != nil {
t.Fatalf("unexpected error closing %s: %v", name, e)
}
require.NoError(t, fr.Close(), "unexpected error closing %s", name)
}()
} else {
defer func() {
require.NoError(t, r.Close(), "unexpected error closing TSMReader for %s", name)
}()

t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
}
defer func() {
if e := r.Close(); e != nil {
t.Fatalf("error closing TSMReader for %s: %v", name, e)
}
}()
require.NoError(t, err, "unexpected error creating TSMReader for %s", name)
interrupts := make(chan struct{})
var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
}
var i int
for i = 0; i < MaxErrors*2; i++ {
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i))
if i < MaxErrors && !saved {
t.Fatalf("error unexpectedly not saved: %d", i)
iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r)
require.NoError(t, err, "unexpected error creating tsmBatchKeyIterator")

for i := 0; i < 2; i++ {
for _, e := range errorCases {
saved := iter.(*tsmBatchKeyIterator).AppendError(e)
savedErrs := iter.(*tsmBatchKeyIterator).Err().(interface{ Unwrap() []error }).Unwrap()
require.False(t, len(savedErrs) < MaxErrors && !saved, "error not saved when it should have been: %v", e)
}
if i >= MaxErrors && saved {
t.Fatalf("error unexpectedly saved: %d", i)
}
}
errs := iter.Err()
if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) {
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt)
}
expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors)
if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 {
t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error())
}
var blockReadError errBlockRead
iterErr := iter.Err()
joinErr, ok := iterErr.(interface{ Unwrap() []error })
require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr)
require.Equal(t, 1+MaxErrors, len(joinErr.Unwrap()), "saved wrong number of errors")
require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err)
require.Equal(t, testFile, blockReadError.file, "unexpected file name in error")
}

func createTestTSM(t *testing.T) (dir string, name string) {
Expand Down

0 comments on commit c970b22

Please sign in to comment.